diff --git a/infrastructure/main.tf b/infrastructure/main.tf index 1f120c60..8322f83e 100644 --- a/infrastructure/main.tf +++ b/infrastructure/main.tf @@ -217,6 +217,21 @@ resource "aws_lambda_function" "workflows_api_handler" { } } +resource "null_resource" "update_workflows_lambda_image" { + triggers = { + always_run = "${timestamp()}" + } + + depends_on = [aws_lambda_function.workflows_api_handler, null_resource.if_change_run_provisioner] + provisioner "local-exec" { + command = <<-EOT + set -e + aws lambda update-function-code \ + --function-name ${aws_lambda_function.workflows_api_handler.function_name} \ + --image-uri ${aws_ecr_repository.workflows_api_lambda_repository.repository_url}:latest + EOT + } +} # API Gateway HTTP API resource "aws_apigatewayv2_api" "workflows_http_api" { diff --git a/workflows_api/runtime/requirements.txt b/workflows_api/runtime/requirements.txt index aeacc812..ff75686d 100644 --- a/workflows_api/runtime/requirements.txt +++ b/workflows_api/runtime/requirements.txt @@ -17,3 +17,5 @@ xarray==2023.1.0 xstac==1.1.0 zarr==2.13.6 boto3==1.24.59 +aws_xray_sdk>=2.6.0,<3 +aws-lambda-powertools>=1.18.0 diff --git a/workflows_api/runtime/src/main.py b/workflows_api/runtime/src/main.py index 5b04a49a..e12b059c 100644 --- a/workflows_api/runtime/src/main.py +++ b/workflows_api/runtime/src/main.py @@ -1,4 +1,3 @@ -import logging from typing import Union import requests @@ -7,14 +6,16 @@ import src.config as config import src.schemas as schemas from src.collection_publisher import CollectionPublisher, Publisher +from src.monitoring import LoggerRouteHandler, logger, metrics, tracer +from aws_lambda_powertools.metrics import MetricUnit -from fastapi import Body, Depends, FastAPI, HTTPException +from fastapi import Body, Depends, FastAPI, HTTPException, APIRouter from fastapi.exceptions import RequestValidationError from fastapi.responses import JSONResponse +from starlette.requests import Request -settings = config.Settings() -logger = logging.getLogger(__name__) +settings = config.Settings() collection_publisher = CollectionPublisher() publisher = Publisher() @@ -31,6 +32,7 @@ root_path=settings.workflow_root_path, openapi_url="/openapi.json", docs_url="/docs", + router = APIRouter(route_class=LoggerRouteHandler) ) @@ -149,7 +151,42 @@ async def send_cli_command(cli_command: str): return airflow_helpers.send_cli_command(cli_command) + +# If the correlation header is used in the UI, we can analyze traces that originate from a given user or client +@app.middleware("http") +async def add_correlation_id(request: Request, call_next): + """Add correlation ids to all requests and subsequent logs/traces""" + # Get correlation id from X-Correlation-Id header if provided + corr_id = request.headers.get("x-correlation-id") + if not corr_id: + try: + # If empty, use request id from aws context + corr_id = request.scope["aws.context"].aws_request_id + except KeyError: + # If empty, use uuid + corr_id = "local" + + # Add correlation id to logs + logger.set_correlation_id(corr_id) + + # Add correlation id to traces + tracer.put_annotation(key="correlation_id", value=corr_id) + + response = await tracer.capture_method(call_next)(request) + # Return correlation header in response + response.headers["X-Correlation-Id"] = corr_id + logger.info("Request completed") + return response + # exception handling @workflows_app.exception_handler(RequestValidationError) async def validation_exception_handler(request, exc): + metrics.add_metric(name="ValidationErrors", unit=MetricUnit.Count, value=1) return JSONResponse(str(exc), status_code=422) + +@app.exception_handler(Exception) +async def general_exception_handler(request, err): + """Handle exceptions that aren't caught elsewhere""" + metrics.add_metric(name="UnhandledExceptions", unit=MetricUnit.Count, value=1) + logger.exception(f"Unhandled exception: {err}") + return JSONResponse(status_code=500, content={"detail": "Internal Server Error"})