Skip to content
This repository has been archived by the owner on Jan 5, 2025. It is now read-only.

Commit

Permalink
Merge pull request #334 from openchatai/flows-v2
Browse files Browse the repository at this point in the history
Flows v2
  • Loading branch information
gharbat authored Dec 5, 2023
2 parents c47dd3c + 007677e commit 45c61a4
Show file tree
Hide file tree
Showing 8 changed files with 418 additions and 10 deletions.
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ services:
# awslogs-stream: llm-server

volumes:
- ./llm-server:/app # Mount the host llm-server directory to /app in the container
- ./llm-server:/app
- shared_data:/app/shared_data
networks:
- opencopilot-net
Expand Down
8 changes: 4 additions & 4 deletions llm-server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,23 @@
from routes.chat.chat_controller import chat_workflow
from routes.copilot.copilot_controller import copilot
from routes.data_source.data_source_controller import datasource_workflow
from routes.prompt.prompt_template_controller import prompt_template_workflow
from routes.flow.flow_controller import flow
from routes.prompt.prompt_controller import prompt_workflow
from routes.prompt.prompt_template_controller import prompt_template_workflow
from routes.uploads.upload_controller import upload_controller
from routes.workflow.workflow_controller import workflow
from utils.config import Config
from utils.vector_store_setup import init_qdrant_collections
from shared.models.opencopilot_db import create_database_schema
from utils.get_logger import structlog


load_dotenv()

create_database_schema()

app = Flask(__name__)
app.url_map.strict_slashes = False
app.register_blueprint(workflow, url_prefix="/backend/flows")
app.register_blueprint(workflow, url_prefix="/backend/flows") # todo delete this one once the new flows are ready
app.register_blueprint(flow, url_prefix="/backend/flows-new")
app.register_blueprint(_swagger, url_prefix="/backend/swagger_api")
app.register_blueprint(chat_workflow, url_prefix="/backend/chat")
app.register_blueprint(copilot, url_prefix="/backend/copilot")
Expand Down
129 changes: 129 additions & 0 deletions llm-server/models/repository/flow_repo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
from typing import Optional, Type

from opencopilot_db import engine
from sqlalchemy.orm import sessionmaker

from shared.models.opencopilot_db.flow import Flow
from shared.models.opencopilot_db.flow_variables import FlowVariable

Session = sessionmaker(bind=engine)


def create_flow(chatbot_id: str, name: str, payload: dict, description: str = None) -> Flow:
"""
Creates a new flow record in the database.
Args:
description:
payload:
chatbot_id: The ID of the chatbot associated with the flow.
name: The name of the flow.
Returns:
The newly created Flow object.
"""
with Session() as session:
flow = Flow(chatbot_id=chatbot_id, name=name, payload=payload, description=description)
session.add(flow)
session.commit()
session.refresh(flow) # Refresh the instance to load any unloaded attributes
return flow


def update_flow(flow_id: str, name: str, payload: dict, description: str) -> Optional[Flow]:
"""
Updates an existing flow record in the database.
Args:
flow_id: The ID of the flow to update.
name: The new name of the flow.
payload: The new payload for the flow.
description: The new description of the flow.
Returns:
The updated Flow object, or None if not found.
"""
with Session() as session:
flow = session.query(Flow).filter(Flow.id == flow_id).first()
if flow:
flow.name = name
flow.payload = payload
flow.description = description
session.commit()
session.refresh(flow)
return flow
return None


def get_all_flows_for_bot(bot_id: str) -> list[Type[Flow]]:
"""
Retrieves all flows for a given bot from the database.
Args:
bot_id: The ID of the bot.
Returns:
A list of Flow objects.
"""
with Session() as session:
flows = session.query(Flow).filter(Flow.chatbot_id == bot_id).all()
return flows


def get_flow_by_id(flow_id: str) -> Optional[Flow]:
"""
Retrieves a specific flow by its ID from the database.
Args:
flow_id: The ID of the flow.
Returns:
The Flow object if found, otherwise None.
"""
with Session() as session:
return session.query(Flow).filter(Flow.id == str(flow_id)).first()


def get_variables_for_flow(flow_id: str) -> list[Type[FlowVariable]]:
"""
Retrieves all variables for a specific flow from the database.
Args:
flow_id: The ID of the flow.
Returns:
A list of FlowVariable objects.
"""
with Session() as session:
return session.query(FlowVariable).filter(FlowVariable.flow_id == flow_id).all()


def add_or_update_variable_in_flow(bot_id: str, flow_id: str, name: str, value: str, runtime_override_key: str = None,
runtime_override_action_id: str = None) -> FlowVariable:
"""
Adds a new variable to a flow or updates it if it already exists.
Args:
bot_id:
runtime_override_key:
runtime_override_action_id:
flow_id: The ID of the flow.
name: The name of the variable.
value: The value of the variable.
Returns:
The updated or newly created FlowVariable object.
"""
with Session() as session:
variable = session.query(FlowVariable).filter_by(bot_id=bot_id, flow_id=flow_id, name=name,
runtime_override_action_id=runtime_override_action_id,
runtime_override_key=runtime_override_key).first()
if variable:
variable.value = value
else:
variable = FlowVariable(bot_id=bot_id, flow_id=flow_id, name=name,
runtime_override_action_id=runtime_override_action_id,
runtime_override_key=runtime_override_key)
session.add(variable)
session.commit()
return variable
50 changes: 50 additions & 0 deletions llm-server/presenters/flow_presenters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from opencopilot_db import engine
from sqlalchemy.orm import sessionmaker

from shared.models.opencopilot_db.flow import Flow
from shared.models.opencopilot_db.flow_variables import FlowVariable

Session = sessionmaker(bind=engine)


def flow_to_dict(flow: Flow):
"""
Convert a Flow object to a dictionary, including its associated variables.
Args:
flow: The Flow object.
Returns:
A dictionary representation of the Flow, including its variables.
"""
with Session() as session:
# Query for variables associated with the flow
variables = session.query(FlowVariable).filter(FlowVariable.flow_id == flow.id).all()
variables_dict = [flow_variable_to_dict(variable) for variable in variables]

return {
"flow_id": flow.id.hex() if isinstance(flow.id, bytes) else flow.id,
"name": flow.name,
"payload": flow.payload,
"description": flow.description,
"last_saved_at": flow.updated_at.isoformat() if flow.updated_at else None,
"variables": variables_dict # Including nested variables
}


def flow_to_simplified_dict(flow: Flow):
"""Convert a Flow object to a dictionary."""
return {
"id": flow.id.hex() if isinstance(flow.id, bytes) else flow.id,
"name": flow.name,
}


def flow_variable_to_dict(variable: FlowVariable):
"""Convert a FlowVariable object to a dictionary."""
return {
"name": variable.name,
"value": variable.value,
"runtime_override_key": variable.runtime_override_key,
"runtime_override_action_id": variable.runtime_override_action_id,
}
179 changes: 179 additions & 0 deletions llm-server/routes/flow/flow_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
from flask import Blueprint, jsonify, request

from models.repository.copilot_repo import find_one_or_fail_by_id
from models.repository.flow_repo import create_flow, get_all_flows_for_bot, get_flow_by_id, get_variables_for_flow, \
add_or_update_variable_in_flow, update_flow
from presenters.flow_presenters import flow_to_dict, flow_variable_to_dict
from utils.db import Database

db_instance = Database()
mongo = db_instance.get_db()

flow = Blueprint("flow", __name__)


@flow.route("/bot/<bot_id>", methods=["GET"])
def get_all_flows_api(bot_id: str):
"""
API endpoint to retrieve all flows for a given bot.
Args:
bot_id: The ID of the bot.
Returns:
A Flask response object with a list of Flow objects as dictionaries.
"""
try:
flows = get_all_flows_for_bot(bot_id)
flows_dict = [flow_to_dict(flow) for flow in
flows] # Assuming flow_to_dict is a function to convert Flow objects to dictionaries
return jsonify(flows_dict), 200
except Exception as e:
# Log the exception here
print(f"Error retrieving flows for bot ID {bot_id}: {e}")
# Return an error response
return jsonify({"error": "Failed to retrieve flows"}), 500


@flow.route("/bot/<bot_id>", methods=["POST"])
def create_flow_api(bot_id: str):
"""
API endpoint to create a new flow record.
Args:
bot_id: The ID of the chatbot associated with the flow.
Returns:
A Flask response object with the newly created Flow object as a dictionary.
"""
try:
data = request.json
if not data:
return jsonify({"error": "No data provided"}), 400

name = data.get("name")
description = data.get("description", None)
payload = data.get("blocks", {}) # todo validation
if not name:
return jsonify({"error": "Missing required field: 'name'"}), 400

flow = create_flow(bot_id, name, payload, description)
return jsonify(flow_to_dict(flow)), 201
except Exception as e:
# Log the exception here
print(f"Error creating flow: {e}")
# Return an error response
return jsonify({"error": "Failed to create flow. {}".format(str(e))}), 500


@flow.route("/<flow_id>", methods=["PUT"])
def update_flow_api(flow_id: str):
"""
API endpoint to update an existing flow record.
Args:
flow_id: The ID of the flow to be updated.
Returns:
A Flask response object with the updated Flow object as a dictionary.
"""
try:
data = request.json
if not data:
return jsonify({"error": "No data provided"}), 400

name = data.get("name")
description = data.get("description", None)
payload = data.get("blocks", {}) # TODO: Add validation

if not name:
return jsonify({"error": "Missing required field: 'name'"}), 400

updated_flow = update_flow(flow_id, name, payload, description)
if updated_flow:
return jsonify(flow_to_dict(updated_flow)), 200
else:
return jsonify({"error": "Flow not found"}), 404
except Exception as e:
print(f"Error updating flow: {e}")
return jsonify({"error": "Failed to update flow. {}".format(str(e))}), 500


@flow.route("/<flow_id>", methods=["GET"])
def get_flow_by_id_api(flow_id: str):
"""
API endpoint to retrieve a specific flow by its ID.
Args:
flow_id: The ID of the flow.
Returns:
A Flask response object with the Flow object as a dictionary.
"""
try:
flow = get_flow_by_id(flow_id)
if flow:
return jsonify(flow_to_dict(flow)), 200
else:
return jsonify({"error": "Flow not found"}), 404
except Exception as e:
# Log the exception here
print(f"Error retrieving flow with ID {flow_id}: {e}")
# Return an error response
return jsonify({"error": "Failed to retrieve flow {}".format(str(e))}), 500


@flow.route("/<flow_id>/variables", methods=["GET"])
def get_flow_variables_api(flow_id: str):
"""
API endpoint to retrieve variables associated with a specific flow.
Args:
flow_id: The ID of the flow.
Returns:
A Flask response object with a list of FlowVariable objects as dictionaries.
"""
try:
flow_variables = get_variables_for_flow(flow_id)
variables_dict = [flow_variable_to_dict(variable) for variable in
flow_variables] # Assuming flow_variable_to_dict is defined
return jsonify(variables_dict), 200
except Exception as e:
# Log the exception here
print(f"Error retrieving flow variables for flow ID {flow_id}: {e}")
# Return an error response
return jsonify({"error": "Failed to retrieve flow variables {}".format(str(e))}), 500


@flow.route("/<flow_id>/variables", methods=["POST", "PUT"])
def add_variables_to_flow_api(flow_id: str):
"""
API endpoint to add or update variables in a specific flow.
Args:
flow_id: The ID of the flow.
Returns:
A Flask response object with the updated or newly created FlowVariable object as a dictionary.
"""
try:
data = request.json
name = data.get('name')
value = data.get('value')
runtime_override_key = data.get('runtime_override_key', None)
runtime_override_action_id = data.get('runtime_override_action_id', None)
copilot_id = data.get('chatbot_id')
bot = find_one_or_fail_by_id(copilot_id)

if not name or value is None:
return jsonify({"error": "Missing required fields"}), 400

variable = add_or_update_variable_in_flow(bot.id, flow_id, name, value, runtime_override_key,
runtime_override_action_id)
return jsonify({"status": "success", "data": flow_variable_to_dict(variable)}), 201
except Exception as e:
# Log the exception here
print(f"Error adding/updating variable in flow: {e}")
# Return an error response
return jsonify({"error": "Failed to add/update variable in flow {}".format(str(e))}), 500
Loading

0 comments on commit 45c61a4

Please sign in to comment.