diff --git a/gemini/evaluation/evaluate_rag_pipeline.ipynb b/gemini/evaluation/evaluate_rag_pipeline.ipynb
new file mode 100644
index 0000000000..4abe84a7d8
--- /dev/null
+++ b/gemini/evaluation/evaluate_rag_pipeline.ipynb
@@ -0,0 +1,1650 @@
+{
+ "cells": [
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "ur8xi4C7S06n"
+ },
+ "outputs": [],
+ "source": [
+ "# Copyright 2024 Google LLC\n",
+ "#\n",
+ "# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
+ "# you may not use this file except in compliance with the License.\n",
+ "# You may obtain a copy of the License at\n",
+ "#\n",
+ "# https://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing, software\n",
+ "# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
+ "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
+ "# See the License for the specific language governing permissions and\n",
+ "# limitations under the License."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "JAPoU8Sm5E6e"
+ },
+ "source": [
+ "# Evaluate generated answers from Retrieval-Augmented Generation (RAG) using Rapid Evaluation and Dataflow ML with Vertex AI pipelines\n",
+ "\n",
+ "
\n",
+ " \n",
+ " \n",
+ " Open in Colab\n",
+ " \n",
+ " | \n",
+ " \n",
+ " \n",
+ " Open in Colab Enterprise\n",
+ " \n",
+ " | \n",
+ " \n",
+ " \n",
+ " Open in Workbench\n",
+ " \n",
+ " | \n",
+ " \n",
+ " \n",
+ " View on GitHub\n",
+ " \n",
+ " | \n",
+ "
"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "5da949dba939"
+ },
+ "source": [
+ "| | |\n",
+ "|-|-|\n",
+ "|Author(s) | [Ivan Nardini](https://github.com/inardini) |"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "tvgnzT1CKxrO"
+ },
+ "source": [
+ "## Overview\n",
+ "\n",
+ "This notebook shows how you can use Vertex AI Pipelines to build a GenAI Model Evaluation Batch pipeline to evaluate a question-answering task with Rapid Eval API and DataflowML.\n",
+ "\n",
+ "This tutorial uses the following Google Cloud ML services:\n",
+ "\n",
+ "- `Dataflow`\n",
+ "- `Vertex AI Rapid Eval API`\n",
+ "- `Vertex AI Pipelines`\n",
+ "\n",
+ "The steps performed include:\n",
+ "\n",
+ "- Prepare the evaluation dataset.\n",
+ "- Build validation, evaluation and visualization pipeline components.\n",
+ "- Define your pipeline using Kubeflow Pipelines DSL package.\n",
+ "- Compile your pipeline.\n",
+ "- Submit your pipeline run.\n",
+ "\n",
+ "Learn more about [Vertex AI Rapid Eval API](https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/evaluation), [Dataflow ML](https://cloud.google.com/dataflow/docs/machine-learning) and [Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/introduction)."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "61RBz8LLbxCR"
+ },
+ "source": [
+ "## Get started"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "No17Cw5hgx12"
+ },
+ "source": [
+ "### Install required packages\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "tFy3H3aPgx12"
+ },
+ "outputs": [],
+ "source": [
+ "! pip3 install --upgrade --quiet pip\n",
+ "! pip3 install --upgrade --quiet google-cloud-aiplatform\n",
+ "! pip3 install --upgrade --quiet google-cloud-pipeline-components\n",
+ "! pip3 install --upgrade --quiet pandas plotly multiprocess etils"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "R5Xep4W9lq-Z"
+ },
+ "source": [
+ "### Restart runtime (Colab only)\n",
+ "\n",
+ "To use the newly installed packages, you must restart the runtime on Google Colab."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "XRvKdaPDTznN"
+ },
+ "outputs": [],
+ "source": [
+ "if \"google.colab\" in sys.modules:\n",
+ "\n",
+ " import IPython\n",
+ "\n",
+ " app = IPython.Application.instance()\n",
+ " app.kernel.do_shutdown(True)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "SbmM4z7FOBpM"
+ },
+ "source": [
+ "\n",
+ "⚠️ The kernel is going to restart. Wait until it's finished before continuing to the next step. ⚠️\n",
+ "
\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "dmWOrTJ3gx13"
+ },
+ "source": [
+ "### Authenticate your notebook environment (Colab only)\n",
+ "\n",
+ "Authenticate your environment on Google Colab.\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "NyKGtVQjgx13"
+ },
+ "outputs": [],
+ "source": [
+ "if \"google.colab\" in sys.modules:\n",
+ "\n",
+ " from google.colab import auth\n",
+ "\n",
+ " auth.authenticate_user()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "BF1j6f9HApxa"
+ },
+ "source": [
+ "### Set Google Cloud project information\n",
+ "\n",
+ "To get started using Vertex AI, you must have an existing Google Cloud project and [enable the Vertex AI API](https://console.cloud.google.com/flows/enableapi?apiid=aiplatform.googleapis.com).\n",
+ "\n",
+ "Learn more about [setting up a project and a development environment](https://cloud.google.com/vertex-ai/docs/start/cloud-environment)."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "WReHDGG5g0XY"
+ },
+ "source": [
+ "#### Set your project ID and project number"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "oM1iC_MfAts1"
+ },
+ "outputs": [],
+ "source": [
+ "PROJECT_ID = \"[your-project-id]\" # @param {type:\"string\"}\n",
+ "\n",
+ "# Set the project id\n",
+ "! gcloud config set project {PROJECT_ID}"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "oZpm-sL8f1z_"
+ },
+ "outputs": [],
+ "source": [
+ "PROJECT_NUMBER = \"[your-project-number]\" # @param {type:\"string\"}"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "region"
+ },
+ "source": [
+ "#### Region\n",
+ "\n",
+ "You can also change the `REGION` variable used by Vertex AI. Learn more about [Vertex AI regions](https://cloud.google.com/vertex-ai/docs/general/locations)."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "I6FmBV2_0fBP"
+ },
+ "outputs": [],
+ "source": [
+ "REGION = \"us-central1\" # @param {type: \"string\"}"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "zgPO1eR3CYjk"
+ },
+ "source": [
+ "#### Create a Cloud Storage bucket\n",
+ "\n",
+ "Create a storage bucket to store intermediate artifacts such as datasets."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "MzGDU7TWdts_"
+ },
+ "outputs": [],
+ "source": [
+ "BUCKET_NAME = \"your-bucket-name-{PROJECT_ID}-unique\" # @param {type:\"string\"}\n",
+ "\n",
+ "BUCKET_URI = f\"gs://{BUCKET_NAME}\" # @param {type:\"string\"}"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "NIq7R4HZCfIc"
+ },
+ "outputs": [],
+ "source": [
+ "! gsutil mb -l {REGION} -p {PROJECT_ID} {BUCKET_URI}"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "set_service_account"
+ },
+ "source": [
+ "#### Service Account and permissions\n",
+ "\n",
+ "This notebook requires a service account with the following permissions:\n",
+ "\n",
+ "- `Vertex AI User` to call Vertex API\n",
+ "- `Storage Object Admin` to read and write to your GCS bucket.\n",
+ "- `Dataflow Worker` to execute work units for a Dataflow pipeline with Compute Engine service account.\n",
+ "- `Dataflow Developer` to execute and manipulate Dataflow jobs.\n",
+ "\n",
+ "[Check out the documentation](https://cloud.google.com/iam/docs/manage-access-service-accounts#iam-view-access-sa-gcloud) to know how to grant those permissions to a single service account.\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "ssUJJqXJJHgC"
+ },
+ "outputs": [],
+ "source": [
+ "SERVICE_ACCOUNT = \"[your-service-account]\" # @param {type:\"string\"}\n",
+ "\n",
+ "SERVICE_ACCOUNT = f\"{PROJECT_NUMBER}-compute@developer.gserviceaccount.com\""
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "wqOHg5aid6HP"
+ },
+ "outputs": [],
+ "source": [
+ "! gcloud projects add-iam-policy-binding {PROJECT_ID} \\\n",
+ " --member=serviceAccount:{SERVICE_ACCOUNT} \\\n",
+ " --role=roles/aiplatform.user\n",
+ "\n",
+ "! gcloud projects add-iam-policy-binding {PROJECT_ID} \\\n",
+ " --member=serviceAccount:{SERVICE_ACCOUNT} \\\n",
+ " --role=roles/storage.objectAdmin\n",
+ "\n",
+ "! gcloud projects add-iam-policy-binding {PROJECT_ID} \\\n",
+ " --member=serviceAccount:{SERVICE_ACCOUNT} \\\n",
+ " --role=roles/dataflow.worker\n",
+ "\n",
+ "! gcloud projects add-iam-policy-binding {PROJECT_ID} \\\n",
+ " --member=serviceAccount:{SERVICE_ACCOUNT} \\\n",
+ " --role=roles/dataflow.developer"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "EkMLbwjdNttf"
+ },
+ "source": [
+ "### Set tutorial folder and workspace\n",
+ "\n",
+ "Set a folder to collect data and any tutorial artifacts."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "s57jCgZHNwC5"
+ },
+ "outputs": [],
+ "source": [
+ "from pathlib import Path\n",
+ "\n",
+ "ROOT_PATH = Path.cwd()\n",
+ "TUTORIAL_PATH = ROOT_PATH / \"tutorial\"\n",
+ "DATA_PATH = TUTORIAL_PATH / \"data\"\n",
+ "SRC_PATH = TUTORIAL_PATH / \"src\"\n",
+ "PIPELINE_PATH = TUTORIAL_PATH / \"pipeline\"\n",
+ "EVAL_PATH = TUTORIAL_PATH / \"evaluations\"\n",
+ "\n",
+ "DATA_PATH.mkdir(parents=True, exist_ok=True)\n",
+ "SRC_PATH.mkdir(parents=True, exist_ok=True)\n",
+ "PIPELINE_PATH.mkdir(parents=True, exist_ok=True)\n",
+ "EVAL_PATH.mkdir(parents=True, exist_ok=True)\n",
+ "\n",
+ "from etils import epath\n",
+ "\n",
+ "WORKSPACE_BUCKET_URI = epath.Path(BUCKET_URI) / \"evaluate_rag\"\n",
+ "DATA_URI = WORKSPACE_BUCKET_URI / \"data\"\n",
+ "EVALUATIONS_URI = WORKSPACE_BUCKET_URI / \"evaluations\"\n",
+ "SRC_URI = WORKSPACE_BUCKET_URI / \"src\"\n",
+ "PIPELINE_ROOT_URI = WORKSPACE_BUCKET_URI / \"pipeline\"\n",
+ "TMP_URI = WORKSPACE_BUCKET_URI / \"tmp\"\n",
+ "\n",
+ "WORKSPACE_BUCKET_URI.mkdir(parents=True, exist_ok=True)\n",
+ "DATA_URI.mkdir(parents=True, exist_ok=True)\n",
+ "EVALUATIONS_URI.mkdir(parents=True, exist_ok=True)\n",
+ "SRC_URI.mkdir(parents=True, exist_ok=True)\n",
+ "PIPELINE_ROOT_URI.mkdir(parents=True, exist_ok=True)\n",
+ "TMP_URI.mkdir(parents=True, exist_ok=True)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "ZHPJ9u4mqVPZ"
+ },
+ "source": [
+ "### Import libraries\n",
+ "\n",
+ "Import the required libraries."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "TDYkOZyvqQ5N"
+ },
+ "outputs": [],
+ "source": [
+ "# General\n",
+ "from pathlib import Path\n",
+ "from typing import List, NamedTuple\n",
+ "\n",
+ "import pandas as pd\n",
+ "import plotly.graph_objects as go\n",
+ "# Model Eval (locally)\n",
+ "from google import auth\n",
+ "# Model Eval (remote)\n",
+ "from google.cloud import aiplatform\n",
+ "from google_cloud_pipeline_components.types.artifact_types import VertexDataset\n",
+ "from google_cloud_pipeline_components.v1.dataflow import DataflowPythonJobOp\n",
+ "from google_cloud_pipeline_components.v1.wait_gcp_resources import \\\n",
+ " WaitGcpResourcesOp\n",
+ "from IPython.display import HTML, display\n",
+ "from kfp import compiler, dsl\n",
+ "from kfp.dsl import Metrics, Output"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "j0oHPp9nDSW9"
+ },
+ "source": [
+ "### Set constants\n",
+ "\n",
+ "Set tutorial variables."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "0YXPlcwSDOpw"
+ },
+ "outputs": [],
+ "source": [
+ "INPUT_EVALUATION_DATASET_URI = \"gs://github-repo/evaluate-gemini-autosxs-custom-task/evaluation_rag_qa_dataset.jsonl\"\n",
+ "\n",
+ "OUTPUT_EVALUATION_DATASET_URI = str(EVALUATIONS_URI / \"rag_qa_eval\")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "e6pDCl4sAIvx"
+ },
+ "source": [
+ "### Helpers\n",
+ "\n",
+ "Define an helper function to print evaluation results."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "xewl9iG0ALTa"
+ },
+ "outputs": [],
+ "source": [
+ "def print_content(df: pd.DataFrame, columns: List[str], n: int = 2) -> None:\n",
+ " \"\"\"Prints specified text columns from a DataFrame.\"\"\"\n",
+ "\n",
+ " style = \"white-space: pre-wrap; width: 800px; overflow-x: auto;\"\n",
+ " selected_df = df[columns].sample(n=n)\n",
+ "\n",
+ " for _, row in selected_df.iterrows():\n",
+ " for column in columns:\n",
+ " display(\n",
+ " HTML(f\"{column}:
{row[column]}
\")\n",
+ " )\n",
+ " display(HTML(\"
\"))\n",
+ "\n",
+ "\n",
+ "def visualize_eval_qa_summary_metrics(df: pd.DataFrame) -> None:\n",
+ " \"\"\"Plot main generated answers evaluation metrics\"\"\"\n",
+ "\n",
+ " categories = [\n",
+ " \"question_answering_quality\",\n",
+ " \"question_answering_helpfulness\",\n",
+ " \"fulfillment\",\n",
+ " \"question_answering_relevance\",\n",
+ " \"groundedness\",\n",
+ " ]\n",
+ "\n",
+ " fig = go.Figure()\n",
+ "\n",
+ " for category in categories:\n",
+ " fig.add_trace(\n",
+ " go.Bar(\n",
+ " x=[category],\n",
+ " y=[\n",
+ " rapid_eval_aggregated_metrics_df[f\"{category}_score_mean\"].values[0]\n",
+ " ],\n",
+ " error_y=dict(\n",
+ " type=\"data\",\n",
+ " array=[\n",
+ " rapid_eval_aggregated_metrics_df[\n",
+ " f\"{category}_score_std\"\n",
+ " ].values[0]\n",
+ " ],\n",
+ " visible=True,\n",
+ " ),\n",
+ " name=category,\n",
+ " )\n",
+ " )\n",
+ "\n",
+ " fig.update_layout(\n",
+ " title=\"RAG Q&A Rapid Eval Scores (mean and std)\",\n",
+ " xaxis_title=\"Metric\",\n",
+ " yaxis_title=\"Score\",\n",
+ " showlegend=False,\n",
+ " )\n",
+ "\n",
+ " fig.show()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "init_aip:mbsdk"
+ },
+ "source": [
+ "### Initialize Vertex AI SDK for Python\n",
+ "\n",
+ "Initialize the Vertex AI SDK for Python for your project and corresponding bucket."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "HRwODErb5z5U"
+ },
+ "outputs": [],
+ "source": [
+ "aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_URI)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "K_hfGB-aQXOG"
+ },
+ "source": [
+ "## Evaluate a RAG application for cooking healthy dishes\n",
+ "\n",
+ "In this tutorial, you will use a RAG evaluation dataset from an GenAI application to support chef inter in cooking healthy dishes."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "6dKwv5DoAw6L"
+ },
+ "source": [
+ "### Load the RAG evaluation dataset"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "8ssX9iLhAw6L"
+ },
+ "source": [
+ "Accoding to the [Rapid Eval API](https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/evaluation), the evaluation dataset to calculate the relevant RAG evaluation metrics should contain the following columns:\n",
+ "\n",
+ "- instruction: in this case, it contains the cooking question\n",
+ "- context: in this scenario it contains some facts to answer the cooking question.\n",
+ "- prediction: the answer to the question grounded in the context. \n",
+ "\n",
+ "Check out the [Metrics bundles](https://cloud.google.com/vertex-ai/generative-ai/docs/models/rapid-evaluation#metric-bundles) documentation to know more about commonly associated metrics to question-answering task."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "u1Xx-jm-Aw6L"
+ },
+ "outputs": [],
+ "source": [
+ "eval_dataset_df = pd.read_json(INPUT_EVALUATION_DATASET_URI, lines=True)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "KX2X-InpBMgM"
+ },
+ "outputs": [],
+ "source": [
+ "eval_dataset_df.head()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "eNID607GJKzi"
+ },
+ "source": [
+ "### Evaluate the RAG application using Dataflow ML Remote Inferece with Rapid Eval API in Vertex AI Pipelines\n",
+ "\n",
+ "To build a Vertex AI Pipeline that evaluates the RAG application using Dataflow ML Remote Inferece with Rapid Eval API, you start by defining pipeline components. In this case, you have:\n",
+ "\n",
+ "- `validate_eval_dataset` component which read the evaluation dataset and run some data quality checks.\n",
+ "\n",
+ "- `eval_qa_batch_pipeline` component which uses the Kubeflow pipeline pattern of **pipeline as component** to combine the Vertex AI Pipelines `DataflowPythonJobOp` with some helpers components and provide more readable evaluation pipeline.\n",
+ "\n",
+ "- `visualize_eval_qa_metrics` component which takes the Rapid Eval API results and returns some aggregated evaluation metrics."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "pz_TWYObVSJ-"
+ },
+ "source": [
+ "#### Define the `validate_eval_dataset` component\n",
+ "\n",
+ "The `validate_eval_dataset` component reads a JSONL file from a Google Cloud Storage URI, validates the data using a defined set of rules, and then outputs two Vertex Datasets: one for valid data and another for invalid data. It leverages multiprocessing and tqdm for efficient processing and includes detailed logging for tracking the validation process. The component exits with an error code if any invalid data is found, providing clear information about the errors.\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "oVuzKdbRVYim"
+ },
+ "outputs": [],
+ "source": [
+ "@dsl.component(\n",
+ " base_image=\"python:3.10\",\n",
+ " packages_to_install=[\n",
+ " \"multiprocess\",\n",
+ " \"tqdm\",\n",
+ " \"pandas\",\n",
+ " \"google-cloud-aiplatform\",\n",
+ " \"google_cloud_pipeline_components\",\n",
+ " \"etils\",\n",
+ " \"importlib_resources\",\n",
+ " ],\n",
+ ")\n",
+ "def validate_eval_dataset(\n",
+ " input_eval_dataset_uri: str,\n",
+ " valid_eval_dataset: Output[VertexDataset],\n",
+ " invalid_eval_dataset: Output[VertexDataset],\n",
+ ") -> NamedTuple(\n",
+ " \"outputs\", valid_eval_dataset_file_uri=str, invalid_eval_dataset_file_uri=str\n",
+ "):\n",
+ " \"\"\"Validate the input dataset.\"\"\"\n",
+ "\n",
+ " import logging\n",
+ " import os\n",
+ " from typing import List, Tuple\n",
+ "\n",
+ " import multiprocess as mp\n",
+ " import pandas as pd\n",
+ " from tqdm.auto import tqdm\n",
+ "\n",
+ " def validate_row(row_tuple: Tuple[int, pd.Series]) -> int | None:\n",
+ " \"\"\"Validates a single row of the DataFrame.\"\"\"\n",
+ " index, row = row_tuple\n",
+ " if row.isnull().any() or (row == \"\").any():\n",
+ " return index\n",
+ "\n",
+ " if not (\n",
+ " isinstance(row[\"instruction\"], str)\n",
+ " and isinstance(row[\"context\"], str)\n",
+ " and isinstance(row[\"prediction\"], str)\n",
+ " ):\n",
+ " return index\n",
+ "\n",
+ " return None\n",
+ "\n",
+ " def validate_dataframe(df: pd.DataFrame, num_processes: int) -> List[int]:\n",
+ " \"\"\"Validates a DataFrame using parallel processing and returns invalid row indices.\"\"\"\n",
+ "\n",
+ " with mp.Pool(processes=num_processes) as pool:\n",
+ " results = list(\n",
+ " tqdm(\n",
+ " pool.imap(validate_row, df.iterrows()),\n",
+ " total=len(df),\n",
+ " desc=\"Validating DataFrame\",\n",
+ " )\n",
+ " )\n",
+ " invalid_indices = [index for index in results if index is not None]\n",
+ " return invalid_indices\n",
+ "\n",
+ " # Set up logging\n",
+ " logging.basicConfig(level=logging.INFO)\n",
+ "\n",
+ " # Determine the maximum number of processes\n",
+ " max_processes = os.cpu_count()\n",
+ "\n",
+ " # Read the dataset\n",
+ " logging.info(f\"Reading dataset from {input_eval_dataset_uri}\")\n",
+ " input_dataset_path = input_eval_dataset_uri.replace(\"gs://\", \"/gcs/\")\n",
+ " eval_df = pd.read_json(input_dataset_path, lines=True)\n",
+ "\n",
+ " # Validate the dataset\n",
+ " logging.info(\"Validating dataset\")\n",
+ " invalid_indices = validate_dataframe(eval_df, num_processes=max_processes)\n",
+ "\n",
+ " # Save valid dataset\n",
+ " invalid_eval_dataset_file_path = invalid_eval_dataset.path + \".jsonl\"\n",
+ " valid_eval_dataset_file_path = valid_eval_dataset.path + \".jsonl\"\n",
+ "\n",
+ " if invalid_indices:\n",
+ " logging.error(f\"DataFrame is invalid! Invalid row indices: {invalid_indices}.\")\n",
+ " logging.info(f\"Saving invalid rows to {invalid_eval_dataset_file_path}\")\n",
+ " invalid_df = eval_df.iloc[invalid_indices]\n",
+ " invalid_df.to_json(invalid_eval_dataset_file_path, orient=\"records\", lines=True)\n",
+ " logging.info(f\"Saving only valid rows to {valid_eval_dataset_file_path}\")\n",
+ " valid_eval_df = eval_df.drop(index=invalid_indices)\n",
+ " valid_eval_df.to_json(\n",
+ " valid_eval_dataset_file_path, orient=\"records\", lines=True\n",
+ " )\n",
+ "\n",
+ " else:\n",
+ " # Log the valid dataset\n",
+ " logging.info(\"DataFrame is valid!\")\n",
+ " logging.info(f\"Saving valid rows to {valid_eval_dataset_file_path}\")\n",
+ " eval_df.to_json(valid_eval_dataset_file_path, orient=\"records\", lines=True)\n",
+ "\n",
+ " valid_eval_dataset.uri = valid_eval_dataset.uri + \".jsonl\"\n",
+ " invalid_eval_dataset.uri = invalid_eval_dataset.uri + \".jsonl\"\n",
+ " component_outputs = NamedTuple(\n",
+ " \"outputs\", valid_eval_dataset_file_uri=str, invalid_eval_dataset_file_uri=str\n",
+ " )\n",
+ " return component_outputs(valid_eval_dataset.uri, invalid_eval_dataset.uri)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "orRafjJ6Vl_e"
+ },
+ "source": [
+ "#### Define the `eval_qa_batch_pipeline` component\n",
+ "\n",
+ "The `eval_qa_batch_pipeline` component uses `DataflowPythonJobOp` to calculate RAG Q&A evaluation metrics using Rapid Eval API. The `DataflowPythonJobOp` operator is used within Vertex AI Pipelines to process data using Apache Beam. It submits Python-based Beam jobs to Dataflow for execution. The Dataflow Runner handles code execution, uploading it and its dependencies to Cloud Storage before creating a job that runs your Beam pipeline on Dataflow.\n",
+ "\n",
+ "The `DataflowPythonJobOp` component takes the following parameters:\n",
+ "\n",
+ "- `project_id`: The project ID.\n",
+ "- `location`: The region.\n",
+ "- `python_module_path`: The Cloud Storage location of the Apache Beam pipeline to run RAG Q&A evaluation task.\n",
+ "- `temp_location`: The Cloud Storage temporary file workspace for the Apache Beam pipeline.\n",
+ "- `requirements_file_path`: The required Python modules to install.\n",
+ "- `args`: The arguments to pass to the Apache Beam pipeline.\n",
+ "\n",
+ "Learn more about [Google Cloud Pipeline Component for Dataflow.](https://google-cloud-pipeline-components.readthedocs.io/en/google-cloud-pipeline-components-0.2.0/google_cloud_pipeline_components.experimental.dataflow.html)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "viypdB6mB1k3"
+ },
+ "source": [
+ "##### Write the Apache Beam pipeline module\n",
+ "\n",
+ "First, you write a Python script that utilizes Apache Beam to process a JSONL file containing model predictions and evaluate them using the Rapid Eval API. The script takes input and output file paths, a desired metric to evaluate, and optional batch size as command-line arguments. It then reads the input data, prepares metric requests, and sends them to the Rapid Eval API for evaluation. The script handles multiple metrics simultaneously, grouping the results by a unique ID, and writes the final evaluations to the specified output JSONL file."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "E_sCWEoVCdMs"
+ },
+ "outputs": [],
+ "source": [
+ "custom_inference_module = '''\n",
+ "\n",
+ "from __future__ import absolute_import\n",
+ "\n",
+ "# General libraries\n",
+ "import argparse\n",
+ "import json\n",
+ "import hashlib\n",
+ "from etils import epath\n",
+ "from pathlib import Path\n",
+ "import pandas as pd\n",
+ "from typing import List, Dict, Tuple\n",
+ "from google import auth\n",
+ "from google.auth.transport import requests as google_auth_requests\n",
+ "from google.api_core import exceptions as google_exceptions\n",
+ "from requests import exceptions\n",
+ "import logging\n",
+ "import backoff\n",
+ "\n",
+ "# Apache Beam libraries\n",
+ "import apache_beam as beam\n",
+ "from apache_beam.io import ReadFromText, WriteToText\n",
+ "from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions\n",
+ "from apache_beam.ml.inference.base import ModelHandler, RunInference\n",
+ "\n",
+ "# Library settings\n",
+ "logging.getLogger().setLevel(logging.INFO)\n",
+ "\n",
+ "# ----------------------------------------------------------------------------\n",
+ "# Helper Functions\n",
+ "# ----------------------------------------------------------------------------\n",
+ "\n",
+ "def parse_element(element: str) -> Dict:\n",
+ " \"\"\"\n",
+ " Parses a JSON string into a dictionary.\n",
+ " \"\"\"\n",
+ " return json.loads(element)\n",
+ "\n",
+ "def unparse_element(element: Dict) -> str:\n",
+ " \"\"\"\n",
+ " Unparses a dictionary into a JSON string.\n",
+ " \"\"\"\n",
+ " return json.dumps(element)\n",
+ "\n",
+ "def generate_id(element: Dict) -> Tuple:\n",
+ " \"\"\"\n",
+ " Generates a unique ID based on the values in string columns.\n",
+ " \"\"\"\n",
+ "\n",
+ " instruction = element.get(\"instruction\", \"\")\n",
+ " context = element.get(\"context\", \"\")\n",
+ " prediction = element.get(\"prediction\", \"\")\n",
+ " combined_string = \"\"\n",
+ "\n",
+ " if instruction:\n",
+ " combined_string = instruction + context + prediction\n",
+ " else:\n",
+ " combined_string = context + prediction\n",
+ "\n",
+ " hash_object = hashlib.sha256(combined_string.encode())\n",
+ " id = hash_object.hexdigest()\n",
+ " return (id, element)\n",
+ "\n",
+ "def get_metric_request(element: Tuple, metric_name: str) -> Tuple:\n",
+ " \"\"\"\n",
+ " Formats the Rapid Eval metric request based on the given metric name.\n",
+ " Args:\n",
+ " element: A JSON string containing the instruction, context, and prediction.\n",
+ " metric_name: The name of the Rapid Eval metric to evaluate.\n",
+ "\n",
+ " Returns:\n",
+ " A dictionary formatted as a Rapid Eval metric request.\n",
+ " \"\"\"\n",
+ "\n",
+ " id, element = element\n",
+ "\n",
+ " if metric_name == \"groundedness\":\n",
+ " input_data = {\n",
+ " f\"{metric_name}_input\": {\n",
+ " \"instance\": {\n",
+ " \"context\": element.get(\"context\"),\n",
+ " \"prediction\": element.get(\"prediction\")\n",
+ " },\n",
+ " \"metric_spec\": {}\n",
+ " }\n",
+ " }\n",
+ " elif metric_name == \"fulfillment\":\n",
+ " input_data = {\n",
+ " f\"{metric_name}_input\": {\n",
+ " \"instance\": {\n",
+ " \"instruction\": element.get(\"instruction\"),\n",
+ " \"prediction\": element.get(\"prediction\")\n",
+ " },\n",
+ " \"metric_spec\": {}\n",
+ " }\n",
+ " }\n",
+ " else:\n",
+ " input_data = {\n",
+ " f\"{metric_name}_input\": {\n",
+ " \"instance\": {\n",
+ " \"instruction\": element.get(\"instruction\"),\n",
+ " \"context\": element.get(\"context\"),\n",
+ " \"prediction\": element.get(\"prediction\")\n",
+ " },\n",
+ " \"metric_spec\": {}\n",
+ " }\n",
+ " }\n",
+ "\n",
+ " return id, input_data\n",
+ "\n",
+ "\n",
+ "def get_evaluation_output(element: Tuple) -> Dict:\n",
+ " \"\"\"Parses Rapid Eval output data.\"\"\"\n",
+ "\n",
+ " _, evaluations = element\n",
+ "\n",
+ " result = {}\n",
+ "\n",
+ " for _, metric_data in evaluations.items():\n",
+ " result['instruction'] = metric_data[0].get('instruction', '')\n",
+ " result['context'] = metric_data[0].get('context', '')\n",
+ " result['prediction'] = metric_data[0].get('prediction', '')\n",
+ " if all(result != '' for v in result.values()):\n",
+ " break\n",
+ "\n",
+ " for key, metric_data in evaluations.items():\n",
+ " for sub_key, sub_value in metric_data[0].items():\n",
+ " if sub_key not in [\"instruction\", \"context\", \"prediction\"]:\n",
+ " result[f\"{key}_{sub_key}\"] = sub_value\n",
+ "\n",
+ " return result\n",
+ "\n",
+ "# ----------------------------------------------------------------------------\n",
+ "# Custom Model Handler for RapidEval API\n",
+ "# ----------------------------------------------------------------------------\n",
+ "\n",
+ "class RapidEvalAPIModelHandler(ModelHandler):\n",
+ " \"\"\"DoFn that accepts an input text, format it as Rapid Eval API request\n",
+ " and sends that it to the API for remote inference\"\"\"\n",
+ "\n",
+ " def __init__(self, project, region, base_uri) -> None:\n",
+ " \"\"\"Initialize the model handler.\"\"\"\n",
+ " self.creds, _ = auth.default(scopes=[\"https://www.googleapis.com/auth/cloud-platform\"])\n",
+ " self.uri = base_uri.format(region, project, region)\n",
+ "\n",
+ " def load_model(self):\n",
+ " \"\"\"Create an authorized session for API requests.\"\"\"\n",
+ " return google_auth_requests.AuthorizedSession(self.creds)\n",
+ "\n",
+ " def run_inference(self, batch, model, inference):\n",
+ " \"\"\"Send the request to the Rapid Eval API and extract the relevant result.\"\"\"\n",
+ "\n",
+ " @backoff.on_exception(backoff.expo,\n",
+ " exceptions.HTTPError,\n",
+ " max_tries=5, max_time=320)\n",
+ " def send_request(self, element):\n",
+ " \"\"\"Sends a request to the Rapid Eval API with exponential backoff.\"\"\"\n",
+ " response = model.post(self.uri, json=element)\n",
+ " response.raise_for_status()\n",
+ " return response\n",
+ "\n",
+ " evaluations = []\n",
+ " try:\n",
+ " for batch_item in batch:\n",
+ " id, element = batch_item\n",
+ " instance = element[next(iter(element))].get('instance', '')\n",
+ " response = send_request(self, element)\n",
+ " result = response.json()\n",
+ " result_key = next(iter(result))\n",
+ " output_data = {**instance, **result[result_key]}\n",
+ " evaluations.append((id, output_data))\n",
+ " return evaluations\n",
+ "\n",
+ " except (google_exceptions.GoogleAPICallError, KeyError, json.JSONDecodeError) as e:\n",
+ " logging.error(f\"Error processing batch: {e}\")\n",
+ " return evaluations\n",
+ "\n",
+ "# ----------------------------------------------------------------------------\n",
+ "# Main Apache Beam Pipeline\n",
+ "# ----------------------------------------------------------------------------\n",
+ "\n",
+ "def run(argv=None):\n",
+ "\n",
+ " # ----------------------------------------------------------------------------\n",
+ " # Parse command line arguments and set up pipeline options\n",
+ " # ----------------------------------------------------------------------------\n",
+ "\n",
+ " parser = argparse.ArgumentParser()\n",
+ " parser.add_argument(\"--input\", dest=\"input\", required=True, help=\"Input JSONL file to process.\")\n",
+ " parser.add_argument(\"--output\", dest=\"output\", required=True, help=\"Output JSONL file to write results to.\")\n",
+ "\n",
+ " known_args, pipeline_args = parser.parse_known_args(argv)\n",
+ "\n",
+ " pipeline_options = PipelineOptions(pipeline_args)\n",
+ " project = pipeline_options.get_all_options()['project']\n",
+ " region = pipeline_options.get_all_options()['region']\n",
+ "\n",
+ " base_uri = \"https://{}-aiplatform.googleapis.com/v1beta1/projects/{}/locations/{}:evaluateInstances\"\n",
+ " pipeline_options.view_as(SetupOptions).save_main_session = True\n",
+ "\n",
+ " # ----------------------------------------------------------------------------\n",
+ " # Run Apache Beam pipeline\n",
+ " # ----------------------------------------------------------------------------\n",
+ "\n",
+ " with beam.Pipeline(options=pipeline_options) as p:\n",
+ "\n",
+ " input_data = (\n",
+ " p\n",
+ " | \"ReadRecords\" >> ReadFromText(known_args.input)\n",
+ " | \"ParseInputJSON\" >> beam.Map(parse_element)\n",
+ " | \"AddKey\" >> beam.Map(generate_id)\n",
+ " )\n",
+ "\n",
+ " question_answering_quality_evaluations = (\n",
+ " input_data\n",
+ " | \"PrepareQAQualityInput\" >> beam.Map(get_metric_request, metric_name='question_answering_quality')\n",
+ " | \"RunQAQualityEvaluation\" >> RunInference(model_handler=RapidEvalAPIModelHandler(project, region, base_uri))\n",
+ " )\n",
+ "\n",
+ " question_answering_helpfulness_evaluations = (\n",
+ " input_data\n",
+ " | \"PrepareQAHelpfulnessInput\" >> beam.Map(get_metric_request, metric_name='question_answering_helpfulness')\n",
+ " | \"RunQAHelpfulnessEvaluation\" >> RunInference(model_handler=RapidEvalAPIModelHandler(project, region, base_uri))\n",
+ " )\n",
+ "\n",
+ " question_answering_relevance_evaluations = (\n",
+ " input_data\n",
+ " | \"PrepareQARelvanceInput\" >> beam.Map(get_metric_request, metric_name='question_answering_relevance')\n",
+ " | \"RunQARelvanceEvaluation\" >> RunInference(model_handler=RapidEvalAPIModelHandler(project, region, base_uri))\n",
+ " )\n",
+ "\n",
+ " groundedness_evaluations = (\n",
+ " input_data\n",
+ " | \"PrepareGroundednessInput\" >> beam.Map(get_metric_request, metric_name='groundedness')\n",
+ " | \"RunGroundednessEvaluation\" >> RunInference(model_handler=RapidEvalAPIModelHandler(project, region, base_uri))\n",
+ " )\n",
+ "\n",
+ " fulfillment_evaluations = (\n",
+ " input_data\n",
+ " | \"PrepareFulfillmentInput\" >> beam.Map(get_metric_request, metric_name='fulfillment')\n",
+ " | \"RunFulfillmentEvaluation\" >> RunInference(model_handler=RapidEvalAPIModelHandler(project, region, base_uri))\n",
+ " )\n",
+ "\n",
+ " results = {\n",
+ " \"question_answering_quality\": question_answering_quality_evaluations,\n",
+ " \"question_answering_helpfulness\": question_answering_helpfulness_evaluations,\n",
+ " \"question_answering_relevance\": question_answering_relevance_evaluations,\n",
+ " \"groundedness\": groundedness_evaluations,\n",
+ " \"fulfillment\": fulfillment_evaluations\n",
+ " }\n",
+ "\n",
+ " output_data = (\n",
+ " results\n",
+ " | 'GroupEvalbyKey' >> beam.CoGroupByKey()\n",
+ " | 'PrepareEvaluations' >> beam.Map(get_evaluation_output)\n",
+ " | 'UnparseOutputData' >> beam.Map(unparse_element)\n",
+ " )\n",
+ " output_data | \"WriteEvaluations\" >> WriteToText(known_args.output, file_name_suffix='.jsonl')\n",
+ "\n",
+ "if __name__ == \"__main__\":\n",
+ " run()\n",
+ "'''\n",
+ "\n",
+ "with epath.Path(SRC_URI / \"main.py\").open(\"w\") as f:\n",
+ " f.write(custom_inference_module)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "Vcpt_UikJWni"
+ },
+ "source": [
+ "##### Write the requirements\n",
+ "\n",
+ "Next, create the `requirements.txt` file to specify Python modules that are required."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "v7keHo9CJTPI"
+ },
+ "outputs": [],
+ "source": [
+ "requirements_file = \"\"\"\n",
+ "future\n",
+ "importlib_resources\n",
+ "etils\n",
+ "backoff\n",
+ "apache-beam\n",
+ "google-cloud-aiplatform\n",
+ "google-auth>=2.26.1\n",
+ "\"\"\"\n",
+ "\n",
+ "with epath.Path(SRC_URI / \"requirements.txt\").open(\"w\") as f:\n",
+ " f.write(requirements_file)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "E1OwI_DhJvzg"
+ },
+ "source": [
+ "##### Write the setup.py\n",
+ "\n",
+ "Next, create the `setup.py` file to specify Python modules that are required to be installed for executing the Dataflow workers.\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "MCC9yig5Jz3k"
+ },
+ "outputs": [],
+ "source": [
+ "setup_module = \"\"\"\n",
+ "import setuptools\n",
+ "\n",
+ "REQUIRED_PACKAGES = [\n",
+ " 'future',\n",
+ " 'importlib_resources',\n",
+ " 'etils',\n",
+ " 'backoff',\n",
+ " 'google-cloud-aiplatform',\n",
+ " 'google-auth>=2.26.1'\n",
+ "]\n",
+ "PACKAGE_NAME = 'eval_qa_rapid_api'\n",
+ "PACKAGE_VERSION = '0.0.1'\n",
+ "setuptools.setup(\n",
+ " name=PACKAGE_NAME,\n",
+ " version=PACKAGE_VERSION,\n",
+ " description='Demo for evaluating question answering using Rapid Eval API',\n",
+ " install_requires=REQUIRED_PACKAGES,\n",
+ " author=\"inardini@google.com\",\n",
+ " packages=setuptools.find_packages()\n",
+ ")\n",
+ "\"\"\"\n",
+ "\n",
+ "with epath.Path(SRC_URI / \"setup.py\").open(\"w\") as f:\n",
+ " f.write(setup_module)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "sKcHFp2IV18w"
+ },
+ "source": [
+ "##### Define the evaluation pipeline component\n",
+ "\n",
+ "Finally, you assemble the `eval_qa_batch_pipeline` pipeline component. The pipeline prepares Dataflow ML job arguments (`prepare_args_op` component), runs the Dataflow ML job, waits for the job to complete (`DataflowPythonJobOp`and `WaitGcpResourcesOp` components), and returns the output evaluation dataset file URI (`get_output_eval_file_uri_op` component)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "asrC9-5UV78E"
+ },
+ "outputs": [],
+ "source": [
+ "@dsl.component(base_image=\"python:3.10\")\n",
+ "def prepare_arguments(\n",
+ " valid_eval_dataset_file_uri: str, output_eval_dataset_uri: str, args: str\n",
+ ") -> NamedTuple(\"outputs\", args=list):\n",
+ " \"\"\"Parse command line arguments.\"\"\"\n",
+ " import json\n",
+ "\n",
+ " args = json.loads(args)\n",
+ " args.insert(0, \"--input\")\n",
+ " args.insert(1, valid_eval_dataset_file_uri)\n",
+ " args.insert(2, \"--output\")\n",
+ " args.insert(3, output_eval_dataset_uri)\n",
+ " component_outputs = NamedTuple(\"outputs\", args=list)\n",
+ " return component_outputs(args)\n",
+ "\n",
+ "\n",
+ "@dsl.component(\n",
+ " base_image=\"python:3.10\",\n",
+ " packages_to_install=[\n",
+ " \"google_cloud_pipeline_components\",\n",
+ " \"etils\",\n",
+ " \"importlib_resources\",\n",
+ " ],\n",
+ ")\n",
+ "def get_output_eval_file_uri(\n",
+ " output_eval_dataset_uri: str, output_eval_dataset: Output[VertexDataset]\n",
+ ") -> NamedTuple(\"outputs\", output_eval_dataset_file_uri=str):\n",
+ " \"\"\"Get the output file path.\"\"\"\n",
+ " import logging\n",
+ "\n",
+ " from etils import epath\n",
+ "\n",
+ " # Set up logging\n",
+ " logging.basicConfig(level=logging.INFO)\n",
+ "\n",
+ " # Read the dataset\n",
+ " logging.info(f\"Reading dataset from {output_eval_dataset_uri}\")\n",
+ " output_eval_dataset_path = output_eval_dataset_uri.replace(\"gs://\", \"/gcs/\")\n",
+ " output_eval_dataset_file_paths = [\n",
+ " str(p)\n",
+ " for p in epath.Path(epath.Path(output_eval_dataset_path).parents[0]).glob(\n",
+ " \"*.jsonl\"\n",
+ " )\n",
+ " ]\n",
+ "\n",
+ " # Prepare the metrics file\n",
+ " output_eval_dataset_file_path = output_eval_dataset.path + \".jsonl\"\n",
+ "\n",
+ " logging.info(f\"Writing metrics file at {output_eval_dataset_file_path}\")\n",
+ " with open(output_eval_dataset_file_path, \"w\") as outfile:\n",
+ " for output_eval_dataset_file in output_eval_dataset_file_paths:\n",
+ " with open(output_eval_dataset_file, \"r\") as interfile:\n",
+ " for line in interfile:\n",
+ " outfile.write(line)\n",
+ "\n",
+ " output_eval_dataset.uri = output_eval_dataset.uri + \".jsonl\"\n",
+ " component_outputs = NamedTuple(\"outputs\", output_eval_dataset_file_uri=str)\n",
+ " return component_outputs(output_eval_dataset.uri)\n",
+ "\n",
+ "\n",
+ "@dsl.pipeline\n",
+ "def eval_qa_batch_pipeline(\n",
+ " valid_eval_dataset_file_uri: str,\n",
+ " output_eval_dataset_uri: str,\n",
+ " args: str,\n",
+ " requirements_file_path: str,\n",
+ " python_file_path: str,\n",
+ " temp_location: str,\n",
+ " project_id: str,\n",
+ " location: str,\n",
+ " staging_dir: str,\n",
+ ") -> str:\n",
+ "\n",
+ " # Prepare the Dataflow ML job arguments\n",
+ " prepare_args_op = prepare_arguments(\n",
+ " valid_eval_dataset_file_uri=valid_eval_dataset_file_uri,\n",
+ " output_eval_dataset_uri=output_eval_dataset_uri,\n",
+ " args=args,\n",
+ " ).set_display_name(\"Prepare arguments\")\n",
+ "\n",
+ " # Run the Dataflow ML job\n",
+ " dataflow_python_op = DataflowPythonJobOp(\n",
+ " project=project_id,\n",
+ " location=location,\n",
+ " python_module_path=python_file_path,\n",
+ " temp_location=temp_location,\n",
+ " requirements_file_path=requirements_file_path,\n",
+ " args=prepare_args_op.outputs[\"args\"],\n",
+ " ).set_display_name(\"Prepare Dataflow ML Evaluation Job\")\n",
+ "\n",
+ " wait_op = (\n",
+ " WaitGcpResourcesOp(gcp_resources=dataflow_python_op.outputs[\"gcp_resources\"])\n",
+ " .set_display_name(\"Run Dataflow ML Evaluation Job\")\n",
+ " .after(dataflow_python_op)\n",
+ " )\n",
+ "\n",
+ " # Get the output metrics uri\n",
+ " get_output_eval_file_uri_op = (\n",
+ " get_output_eval_file_uri(output_eval_dataset_uri=output_eval_dataset_uri)\n",
+ " .set_display_name(\"Get evaluation file\")\n",
+ " .after(wait_op)\n",
+ " )\n",
+ "\n",
+ " return get_output_eval_file_uri_op.outputs[\"output_eval_dataset_file_uri\"]"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "eXPiCuxbWLSM"
+ },
+ "source": [
+ "#### Define the `visualize_eval_qa_metrics` component\n",
+ "\n",
+ "The `visualize_eval_qa_metrics` component calculates the mean and standard deviation for each score column in the Rapid Eval API resulting dataset using multiprocessing for efficiency. The component logs these metrics and saves them into a JSONL file."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "FvwQjTgmULDu"
+ },
+ "outputs": [],
+ "source": [
+ "@dsl.component(\n",
+ " base_image=\"python:3.10\",\n",
+ " packages_to_install=[\n",
+ " \"multiprocess\",\n",
+ " \"tqdm\",\n",
+ " \"numpy\",\n",
+ " \"pandas\",\n",
+ " \"google-cloud-aiplatform\",\n",
+ " \"google_cloud_pipeline_components\",\n",
+ " \"etils\",\n",
+ " \"importlib_resources\",\n",
+ " ],\n",
+ ")\n",
+ "def visualize_eval_qa_metrics(\n",
+ " output_eval_dataset_file_uri: str, output_eval_summary_metrics: Output[Metrics]\n",
+ ") -> NamedTuple(\"outputs\", output_eval_summary_metrics_file_uri=str):\n",
+ " \"\"\"Visualize the evaluation metrics.\"\"\"\n",
+ "\n",
+ " import json\n",
+ " import logging\n",
+ " import os\n",
+ " from typing import Dict, Tuple\n",
+ "\n",
+ " import multiprocess as mp\n",
+ " import numpy as np\n",
+ " import pandas as pd\n",
+ " from tqdm.auto import tqdm\n",
+ "\n",
+ " def calculate_stats(score_column: pd.Series) -> Tuple[float, float]:\n",
+ " \"\"\"Calculates mean and standard deviation for a given score column.\"\"\"\n",
+ " return np.mean(score_column), np.std(score_column)\n",
+ "\n",
+ " def get_metrics(\n",
+ " output_eval_dataset_file_uri: str, num_processes: int\n",
+ " ) -> Dict[str, float]:\n",
+ " \"\"\"Get metrics with mean and standard deviation for score columns.\"\"\"\n",
+ " output_eval_dataset_file_path = output_eval_dataset_file_uri.replace(\n",
+ " \"gs://\", \"/gcs/\"\n",
+ " )\n",
+ " eval_result_df = pd.read_json(output_eval_dataset_file_path, lines=True)\n",
+ " score_columns = [\n",
+ " col for col in eval_result_df.columns if col.endswith(\"_score\")\n",
+ " ]\n",
+ "\n",
+ " with mp.Pool(processes=num_processes) as pool:\n",
+ " eval_report_results = list(\n",
+ " tqdm(\n",
+ " pool.imap(\n",
+ " calculate_stats, [eval_result_df[col] for col in score_columns]\n",
+ " ),\n",
+ " total=len(score_columns),\n",
+ " desc=\"Calculating statistics\",\n",
+ " )\n",
+ " )\n",
+ "\n",
+ " metrics = {}\n",
+ " for col, (mean, std_dev) in zip(score_columns, eval_report_results):\n",
+ " metrics[f\"{col}_mean\"] = round(float(mean), 3)\n",
+ " metrics[f\"{col}_std\"] = round(float(std_dev), 3)\n",
+ " return metrics\n",
+ "\n",
+ " # Set up logging\n",
+ " logging.basicConfig(level=logging.INFO)\n",
+ "\n",
+ " # Determine the maximum number of processes\n",
+ " max_processes = os.cpu_count()\n",
+ "\n",
+ " # Generate metadata\n",
+ " logging.info(\"Generating metadata\")\n",
+ " metrics = get_metrics(output_eval_dataset_file_uri, num_processes=max_processes)\n",
+ "\n",
+ " # Log metrics\n",
+ " logging.info(\"Logging metrics\")\n",
+ " for key, value in metrics.items():\n",
+ " # Assuming output_table_metrics is a custom logging object\n",
+ " output_eval_summary_metrics.log_metric(key, value)\n",
+ "\n",
+ " # Save aggregated metrics\n",
+ " output_eval_summary_metrics_file_path = output_eval_summary_metrics.path + \".jsonl\"\n",
+ " logging.info(f\"Writing metrics file at {output_eval_summary_metrics_file_path}\")\n",
+ " with open(output_eval_summary_metrics_file_path, \"w\") as outfile:\n",
+ " json.dump(metrics, outfile)\n",
+ "\n",
+ " output_eval_summary_metrics.uri = output_eval_summary_metrics.uri + \".jsonl\"\n",
+ " component_outputs = NamedTuple(\"outputs\", output_eval_summary_metrics_file_uri=str)\n",
+ " return component_outputs(output_eval_summary_metrics.uri)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "oUthAmmUlfI4"
+ },
+ "source": [
+ "#### Define your workflow using Kubeflow Pipelines DSL package\n",
+ "\n",
+ "You assemble the pipeline using the defined components according to the evaluation workflow."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "YrUmsKifRXAH"
+ },
+ "outputs": [],
+ "source": [
+ "@dsl.pipeline(\n",
+ " name=\"eval-rag-batch-pipeline\",\n",
+ " description=\"Evaluating question answering using Rapid Eval API\",\n",
+ ")\n",
+ "def pipeline(\n",
+ " input_eval_dataset_uri: str,\n",
+ " output_eval_dataset_uri: str,\n",
+ " args: str,\n",
+ " requirements_file_path: str,\n",
+ " python_file_path: str,\n",
+ " temp_location: str,\n",
+ " project_id: str,\n",
+ " location: str,\n",
+ " staging_dir: str,\n",
+ "):\n",
+ "\n",
+ " # Validate the input dataset\n",
+ " validate_eval_dataset_op = validate_eval_dataset(\n",
+ " input_eval_dataset_uri=input_eval_dataset_uri\n",
+ " ).set_display_name(\"Validate RAG Eval dataset\")\n",
+ "\n",
+ " # Run the evaluation subpipeline\n",
+ " eval_qa_batch_pipeline_op = (\n",
+ " eval_qa_batch_pipeline(\n",
+ " valid_eval_dataset_file_uri=validate_eval_dataset_op.outputs[\n",
+ " \"valid_eval_dataset_file_uri\"\n",
+ " ], # validate_eval_dataset_op.outputs['input_dataset_uri'],\n",
+ " output_eval_dataset_uri=output_eval_dataset_uri,\n",
+ " args=args,\n",
+ " requirements_file_path=requirements_file_path,\n",
+ " python_file_path=python_file_path,\n",
+ " temp_location=temp_location,\n",
+ " project_id=project_id,\n",
+ " location=location,\n",
+ " staging_dir=staging_dir,\n",
+ " )\n",
+ " .set_display_name(\"RAG Q&A Evaluation\")\n",
+ " .after(validate_eval_dataset_op)\n",
+ " )\n",
+ "\n",
+ " # Read the dataset and print some aggregated metrics (average and standard dev)\n",
+ " _ = (\n",
+ " visualize_eval_qa_metrics(\n",
+ " output_eval_dataset_file_uri=eval_qa_batch_pipeline_op.output\n",
+ " )\n",
+ " .set_display_name(\"Visualize RAG Eval metrics\")\n",
+ " .after(eval_qa_batch_pipeline_op)\n",
+ " )"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "WAipu47flsO3"
+ },
+ "source": [
+ "#### Compile your pipeline into a YAML file\n",
+ "\n",
+ "After the workflow of your pipeline is defined, you compile the pipeline into YAML format."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "SGg3aQ5vKRwJ"
+ },
+ "outputs": [],
+ "source": [
+ "compiler.Compiler().compile(\n",
+ " pipeline_func=pipeline, package_path=str(PIPELINE_PATH) + \"/eval_pipeline.json\"\n",
+ ")"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "gzLjYFACltf9"
+ },
+ "source": [
+ "#### Submit your pipeline run\n",
+ "\n",
+ "After the workflow of your pipeline is compiled into the YAML format, you use the Vertex AI Python SDK to submit and run your pipeline.\n",
+ "\n",
+ "The pipeline requires **~15 mins** to run."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "GJlkU5XbKRwT"
+ },
+ "outputs": [],
+ "source": [
+ "pipeline_params = {\n",
+ " \"input_eval_dataset_uri\": INPUT_EVALUATION_DATASET_URI,\n",
+ " \"output_eval_dataset_uri\": OUTPUT_EVALUATION_DATASET_URI,\n",
+ " \"args\": [\n",
+ " \"--runner\",\n",
+ " \"DataflowRunner\",\n",
+ " \"--setup_file\",\n",
+ " str(SRC_URI / \"setup.py\"),\n",
+ " \"--project\",\n",
+ " PROJECT_ID,\n",
+ " \"--region\",\n",
+ " REGION,\n",
+ " ],\n",
+ " \"requirements_file_path\": str(SRC_URI / \"requirements.txt\"),\n",
+ " \"python_file_path\": str(SRC_URI / \"main.py\"),\n",
+ " \"temp_location\": str(TMP_URI),\n",
+ " \"project_id\": PROJECT_ID,\n",
+ " \"location\": REGION,\n",
+ " \"staging_dir\": str(PIPELINE_ROOT_URI),\n",
+ "}\n",
+ "\n",
+ "\n",
+ "pipeline_job = aiplatform.PipelineJob(\n",
+ " display_name=\"evaluate_rag_batch_eval\",\n",
+ " template_path=str(PIPELINE_PATH / \"eval_pipeline.json\"),\n",
+ " parameter_values=pipeline_params,\n",
+ " pipeline_root=str(PIPELINE_ROOT_URI),\n",
+ " enable_caching=False,\n",
+ ")"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "24Ul1JN6KRwT"
+ },
+ "outputs": [],
+ "source": [
+ "pipeline_job.run()"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "mJ_XStxpWu7-"
+ },
+ "source": [
+ "### Get evaluation results\n",
+ "\n",
+ "After the pipeline run is successfully completed, you can both retrive the RAG eval metrics at row and aggregated levels."
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "9umtyu3WrfGt"
+ },
+ "source": [
+ "#### Row-level metrics\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "e1VG_qhi6uCN"
+ },
+ "outputs": [],
+ "source": [
+ "for details in pipeline_job.task_details:\n",
+ " if details.task_name == \"get-output-eval-file-uri\":\n",
+ " break\n",
+ "\n",
+ "# row-level-metrics\n",
+ "rapid_eval_row_metrics_uri = details.outputs[\"output_eval_dataset\"].artifacts[0].uri\n",
+ "rapid_eval_row_metrics_df = pd.read_json(rapid_eval_row_metrics_uri, lines=True)\n",
+ "print_content(rapid_eval_row_metrics_df, columns=rapid_eval_row_metrics_df.columns, n=3)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "yzM_qk13r2On"
+ },
+ "source": [
+ "#### Aggregate metrics"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "ksSLDcBG8pm7"
+ },
+ "outputs": [],
+ "source": [
+ "for details in pipeline_job.task_details:\n",
+ " if details.task_name == \"visualize-eval-qa-metrics\":\n",
+ " break\n",
+ "\n",
+ "# aggregated-metrics\n",
+ "rapid_eval_aggregated_metrics_uri = (\n",
+ " details.outputs[\"output_eval_summary_metrics\"].artifacts[0].uri\n",
+ ")\n",
+ "rapid_eval_aggregated_metrics_df = pd.read_json(\n",
+ " rapid_eval_aggregated_metrics_uri, lines=True\n",
+ ")\n",
+ "visualize_eval_qa_summary_metrics(rapid_eval_aggregated_metrics_df)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "2a4e033321ad"
+ },
+ "source": [
+ "## Cleaning up"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {
+ "id": "xMJjn9l8DYpD"
+ },
+ "outputs": [],
+ "source": [
+ "delete_bucket = False\n",
+ "delete_pipeline = False\n",
+ "delete_tutorial_dir = False\n",
+ "\n",
+ "if delete_bucket:\n",
+ " ! gsutil -m rm -r $BUCKET_URI\n",
+ "\n",
+ "if delete_pipeline:\n",
+ " pipeline_list = aiplatform.PipelineJob.list()\n",
+ " for pipeline in pipeline_list:\n",
+ " if pipeline.display_name == \"evaluate_rag_batch_eval\":\n",
+ " pipeline.delete()\n",
+ "\n",
+ "if delete_tutorial_dir:\n",
+ " import shutil\n",
+ "\n",
+ " shutil.rmtree(str(TUTORIAL_PATH))"
+ ]
+ }
+ ],
+ "metadata": {
+ "colab": {
+ "name": "evaluate_rag_pipeline.ipynb",
+ "toc_visible": true
+ },
+ "kernelspec": {
+ "display_name": "Python 3",
+ "name": "python3"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 0
+}