diff --git a/README.md b/README.md index ee6476e0..554efc16 100644 --- a/README.md +++ b/README.md @@ -72,6 +72,12 @@ End-to-end example use-cases built using modules in this repository. | [Example DAG for MLOps Module](modules/examples/airflow-dags/README.md) | Deploys a Sample DAG in MWAA demonstrating MLOPs and it is using MWAA module from IDF | +### MLOps using Step Functions Module + +| Type | Description | +|-------------------------------------------------------------------------|------------------------------------------------------------------------------------------| +| [Example for MLOps using Step Functions](modules/examples/mlops-stepfunctions/README.md) | Deploys a AWS State Machine in AWS Step Functions demonstrating how to implement the MLOPs using AWS Step Functions | + ### EKS Modules | Type | Description | diff --git a/manifests/mlops-stepfunctions/deployment.yaml b/manifests/mlops-stepfunctions/deployment.yaml new file mode 100644 index 00000000..261155f2 --- /dev/null +++ b/manifests/mlops-stepfunctions/deployment.yaml @@ -0,0 +1,15 @@ +name: mlops-stepfunctions +toolchainRegion: us-east-1 +forceDependencyRedeploy: true +groups: + - name: stepfunctions + path: manifests/mlops-stepfunctions/mlops-stepfunctions.yaml +targetAccountMappings: + - alias: primary + accountId: + valueFrom: + envVariable: PRIMARY_ACCOUNT + default: true + regionMappings: + - region: us-east-1 + default: true diff --git a/manifests/mlops-stepfunctions/mlops-stepfunctions.yaml b/manifests/mlops-stepfunctions/mlops-stepfunctions.yaml new file mode 100644 index 00000000..539fb590 --- /dev/null +++ b/manifests/mlops-stepfunctions/mlops-stepfunctions.yaml @@ -0,0 +1,10 @@ +name: stepfunctions +path: modules/examples/mlops-stepfunctions/ +targetAccount: primary +parameters: + - name: model-name + # Replace value with the valid model name + value: demo + - name: schedule + # Replace the value with cron schedule to execute the state machine created for MLOps + value: "0 6 * * ? *" diff --git a/modules/examples/mlops-stepfunctions/README.md b/modules/examples/mlops-stepfunctions/README.md new file mode 100644 index 00000000..ff08dbe6 --- /dev/null +++ b/modules/examples/mlops-stepfunctions/README.md @@ -0,0 +1,368 @@ +## Introduction +Machine Learning Operations (MLOps) is the practice of operationalizing the end-to-end machine learning workflow, from data preparation to model deployment and monitoring. AWS Step Functions, combined with AWS SageMaker, provides a powerful solution for building and orchestrating MLOps pipelines. + +AWS Step Functions is a serverless function orchestrator that allows you to coordinate multiple AWS services into flexible, resilient, and highly available workflows. It acts as the glue that ties together various components of your MLOps pipeline, ensuring that tasks are executed in the correct order and with the required dependencies. + +AWS SageMaker, on the other hand, is a fully managed machine learning service that provides tools and resources for building, training, and deploying machine learning models. It simplifies the complexities of managing infrastructure and libraries for your machine learning workloads. + + +## Description + +This module shows how to integrate the AWS Step Functions with SageMaker, to create robust and scalable MLOps pipelines that automate the entire machine learning lifecycle. + +Here's a typical workflow: + +1. Data Preprocessing: Using AWS SageMaker Processing, you can preprocess your data by performing tasks such as data cleaning, feature engineering, and data splitting. + +2. Model Training: Leverage SageMaker's built-in algorithms or bring your own custom code to train your machine learning model on the preprocessed data. + +3. Model Evaluation: Evaluate the trained model's performance using metrics like accuracy, precision, recall, or custom metrics specific to your use case. + +4. Model Approval: Implement manual or automated approval steps to review the model's performance and decide whether to proceed with deployment. + +5. Model Deployment: Deploy your trained model to a SageMaker endpoint, making it available for real-time inference or batch processing. + +#### sample event for lambda function which will start the state machine +```json +{ + "config": { + "bucket": "mlops-bucket", + "prefix": "demo/scripts/input.yaml" + } +} +``` + +### input to step function +Input to the state machine will be the json data generated from the yaml which is mentioned in input of lambda function as prefix. + +Update the input.yaml as required. Refer https://docs.aws.amazon.com/step-functions/latest/dg/connect-sagemaker.html for supported inputs by step functions to connect to sagemaker. + +#### sample input which is used to start the state machine. + +```json +{ + "app_id": "aiops", + "model_id": "demo", + "job_prefix": "mlops", + "preprocessing": { + "run": true, + "input": { + "AppSpecification": { + "ImageUri": "683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:1.2-1-cpu-py3", + "ContainerEntrypoint": [ + "python3", + "/opt/ml/processing/code/preprocessing.py" + ] + }, + "ProcessingResources": { + "ClusterConfig": { + "InstanceType": "ml.m5.xlarge", + "InstanceCount": 1, + "VolumeSizeInGB": 50 + } + }, + "ProcessingInputs": [ + { + "InputName": "input", + "AppManaged": false, + "S3Input": { + "S3Uri": "s3://sagemaker-sample-data-us-east-1/processing/census", + "LocalPath": "/opt/ml/processing/input", + "S3DataType": "S3Prefix", + "S3InputMode": "File", + "S3DataDistributionType": "FullyReplicated" + } + }, + { + "InputName": "Code", + "AppManaged": false, + "S3Input": { + "S3Uri": "s3://mlops-bucket/demo/scripts", + "LocalPath": "/opt/ml/processing/code", + "S3DataType": "S3Prefix", + "S3InputMode": "File", + "S3DataDistributionType": "FullyReplicated" + } + } + ], + "ProcessingOutputConfig": { + "Outputs": [ + { + "OutputName": "train", + "AppManaged": false, + "S3Output": { + "S3Uri": "s3://mlops-bucket/demo/processing/train", + "LocalPath": "/opt/ml/processing/train", + "S3UploadMode": "EndOfJob" + } + }, + { + "OutputName": "test", + "AppManaged": false, + "S3Output": { + "S3Uri": "s3://mlops-bucket/demo/processing/test", + "LocalPath": "/opt/ml/processing/test", + "S3UploadMode": "EndOfJob" + } + } + ] + }, + "StoppingCondition": { + "MaxRuntimeInSeconds": 3600 + }, + "AppManaged": false, + "Tags": [ + { + "Key": "APP_ID", + "Value": "aiops" + } + ], + "Environment": null, + "NetworkConfig": null, + "RoleArn": "arn:aws:iam::123456789012:role/SageMakerExecutionRole" + } + }, + "training": { + "run": true, + "input": { + "AlgorithmSpecification": { + "TrainingImage": "683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:1.2-1-cpu-py3", + "ContainerEntrypoint": [ + "python3", + "/opt/ml/input/data/code/train.py" + ], + "TrainingInputMode": "FastFile" + }, + "HyperParameters": null, + "ResourceConfig": { + "InstanceType": "ml.m5.xlarge", + "InstanceCount": 1, + "VolumeSizeInGB": 50 + }, + "InputDataConfig": [ + { + "ChannelName": "training", + "DataSource": { + "S3DataSource": { + "S3DataType": "S3Prefix", + "S3Uri": "s3://mlops-bucket/demo/processing/train", + "S3DataDistributionType": "FullyReplicated" + } + } + }, + { + "ChannelName": "code", + "DataSource": { + "S3DataSource": { + "S3DataType": "S3Prefix", + "S3Uri": "s3://mlops-bucket/demo/scripts", + "S3DataDistributionType": "FullyReplicated" + } + } + } + ], + "OutputDataConfig": { + "S3OutputPath": "s3://mlops-bucket/demo/model/" + }, + "StoppingCondition": { + "MaxRuntimeInSeconds": 3600 + }, + "Tags": [ + { + "Key": "APP_ID", + "Value": "aiops" + } + ], + "Environment": null, + "RetryStrategy": null, + "VpcConfig": null, + "RoleArn": "arn:aws:iam::123456789012:role/SageMakerExecutionRole" + } + }, + "evaluation": { + "run": true, + "input": { + "AppSpecification": { + "ImageUri": "683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:1.2-1-cpu-py3", + "ContainerEntrypoint": [ + "python3", + "/opt/ml/processing/code/evaluation.py" + ] + }, + "ProcessingResources": { + "ClusterConfig": { + "InstanceType": "ml.m5.xlarge", + "InstanceCount": 1, + "VolumeSizeInGB": 50 + } + }, + "ProcessingInputs": [ + { + "InputName": "input", + "AppManaged": false, + "S3Input": { + "S3Uri": "s3://mlops-bucket/demo/model/mlops-demo-1724940337/output/model.tar.gz", + "LocalPath": "/opt/ml/processing/model", + "S3DataType": "S3Prefix", + "S3InputMode": "File", + "S3DataDistributionType": "FullyReplicated" + } + }, + { + "InputName": "Code", + "AppManaged": false, + "S3Input": { + "S3Uri": "s3://mlops-bucket/demo/scripts", + "LocalPath": "/opt/ml/processing/code", + "S3DataType": "S3Prefix", + "S3InputMode": "File", + "S3DataDistributionType": "FullyReplicated" + } + }, + { + "InputName": "test", + "AppManaged": false, + "S3Input": { + "S3Uri": "s3://mlops-bucket/demo/processing/test", + "LocalPath": "/opt/ml/processing/test", + "S3DataType": "S3Prefix", + "S3InputMode": "File", + "S3DataDistributionType": "FullyReplicated" + } + } + ], + "ProcessingOutputConfig": { + "Outputs": [ + { + "OutputName": "evaluation", + "AppManaged": false, + "S3Output": { + "S3Uri": "s3://mlops-bucket/demo/evaluation/output", + "LocalPath": "/opt/ml/processing/evaluation", + "S3UploadMode": "EndOfJob" + } + } + ] + }, + "StoppingCondition": { + "MaxRuntimeInSeconds": 3600 + }, + "AppManaged": false, + "Tags": [ + { + "Key": "APP_ID", + "Value": "aiops" + } + ], + "Environment": null, + "NetworkConfig": null, + "RoleArn": "arn:aws:iam::123456789012:role/SageMakerExecutionRole" + } + }, + "CreateModel": { + "run": true, + "input": { + "EnableNetworkIsolation": null, + "Containers": null, + "VpcConfig": null, + "PrimaryContainer": { + "Image": "683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:1.2-1-cpu-py3", + "ModelDataUrl": "s3://mlops-bucket/demo/model/mlops-demo-1724940337/output/model.tar.gz", + "Environment": { + "SAGEMAKER_PROGRAM": "inference.py", + "SAGEMAKER_SUBMIT_DIRECTORY": "s3://mlops-bucket/demo/scripts/source.tar.gz" + } + }, + "ExecutionRoleArn": "arn:aws:iam::123456789012:role/SageMakerExecutionRole" + } + }, + "batchTransform": { + "run": true, + "input": { + "BatchStrategy": "MultiRecord", + "Environment": { + "APP_ID": "aiops" + }, + "MaxConcurrentTransforms": 2, + "MaxPayloadInMB": 50, + "TransformInput": { + "ContentType": "text/csv", + "SplitType": "Line", + "DataSource": { + "S3DataSource": { + "S3DataType": "S3Prefix", + "S3Uri": "s3://mlops-bucket/demo/processing/test/test_features.csv" + } + } + }, + "TransformOutput": { + "Accept": "text/csv", + "AssembleWith": "Line", + "S3OutputPath": "s3://mlops-bucket/demo/batch-output/mlops-demo-1724940337/" + }, + "TransformResources": { + "InstanceType": "ml.m5.xlarge", + "InstanceCount": 1 + }, + "Tags": [ + { + "Key": "APP_ID", + "Value": "aiops" + } + ] + } + } +} +``` + + + +# Deployment Guide + +See deployment steps in the [Deployment Guide](../../../DEPLOYMENT.md). + + +## Inputs/Outputs + +### Input Parameters + +#### Required + +- `schedule`: cron expression to schedule the event to run the statemachine. + +#### Optional + +- `model-name` : Model Identifier (default it is "demo") + +## Sample manifest declaration + +Create a manifest file under appropriate location, for example examples/manifests +``` +name: mlops-stepfunctions +path: git::https://github.com/awslabs/aiops-modules.git//modules/examples/mlops-stepfunctions?ref=release/1.4.0&depth=1 +parameters: + - name: model-name + value: demo + - name: schedule + value: "0 6 * * ? *" +``` + +### Module Metadata Outputs + +- `MlOpsBucket`: Name of the Bucket where Model Artifacts are stored. +- `SageMakerExecutionRole`: Execution Roles used by SageMaker Service. +- `ImageUri`: Docker Image URI used by SageMaker Jobs. +- `StateMachine`: ARN of State Machine. +- `LambdaFunction`: ARN of Lambda function which starts the execution of State Machine. + +#### Output Example + +```yaml +metadata: | { + "MlOpsBucket": "", + "SageMakerExecutionRole": "arn:aws:iam::123456789012:role/SageMakerExecutionRole", + "ImageUri": "683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:1.2-1-cpu-py3", + "StateMachine": "arn:aws:states:us-east-1:123456789012:stateMachine:MLOpsStateMachine", + "LambdaFunction": "arn:aws:lambda:us-east-1:123456789012:function:MlOpsLambdaFunction", +} +``` diff --git a/modules/examples/mlops-stepfunctions/app.py b/modules/examples/mlops-stepfunctions/app.py new file mode 100644 index 00000000..cb69bd88 --- /dev/null +++ b/modules/examples/mlops-stepfunctions/app.py @@ -0,0 +1,59 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +import aws_cdk +import cdk_nag +from aws_cdk import App +from sagemaker import image_uris # type: ignore[import-untyped] +from sagemaker.sklearn import defaults # type: ignore[import-untyped] + +from settings import ApplicationSettings +from stack import MLOPSSFNResources + +app = App() + +app_settings = ApplicationSettings() + + +stack = MLOPSSFNResources( + scope=app, + id=app_settings.seedfarmer_settings.app_prefix, + project_name=app_settings.seedfarmer_settings.project_name, + deployment_name=app_settings.seedfarmer_settings.deployment_name, + module_name=app_settings.seedfarmer_settings.module_name, + model_name=app_settings.module_settings.model_name, + schedule=app_settings.module_settings.schedule, + env=aws_cdk.Environment( + account=app_settings.cdk_settings.account, + region=app_settings.cdk_settings.region, + ), +) + + +image_uri = image_uris.retrieve(defaults.SKLEARN_NAME, region=app_settings.cdk_settings.region, version="1.2-1") + +aws_cdk.CfnOutput( + scope=stack, + id="metadata", + value=stack.to_json_string( + { + "MlOpsBucket": stack.mlops_assets_bucket.bucket_name, + "SageMakerExecutionRole": stack.sagemaker_execution_role.role_arn, + "ImageUri": image_uri, + "StateMachine": stack.state_machine_arn, + "LambdaFunction": stack.lambda_function_arn, + } + ), +) + +aws_cdk.Aspects.of(stack).add(cdk_nag.AwsSolutionsChecks(log_ignores=True)) + +if app_settings.module_settings.tags: + for tag_key, tag_value in app_settings.module_settings.tags.items(): + aws_cdk.Tags.of(app).add(tag_key, tag_value) + +aws_cdk.Tags.of(app).add("SeedFarmerDeploymentName", app_settings.seedfarmer_settings.deployment_name) +aws_cdk.Tags.of(app).add("SeedFarmerModuleName", app_settings.seedfarmer_settings.module_name) +aws_cdk.Tags.of(app).add("SeedFarmerProjectName", app_settings.seedfarmer_settings.project_name) + +app.synth(force=True) diff --git a/modules/examples/mlops-stepfunctions/deployspec.yaml b/modules/examples/mlops-stepfunctions/deployspec.yaml new file mode 100644 index 00000000..aec33479 --- /dev/null +++ b/modules/examples/mlops-stepfunctions/deployspec.yaml @@ -0,0 +1,40 @@ +publishGenericEnvVariables: true +deploy: + phases: + install: + commands: + - npm install -g aws-cdk@2.153.0 + - pip install -r requirements.txt + build: + commands: + - > + echo "SEEDFARMER_MODULE_METADATA: ${SEEDFARMER_MODULE_METADATA}" + - cdk deploy --require-approval never --progress events --app "python app.py" --outputs-file ./cdk-exports.json + - seedfarmer metadata convert -f cdk-exports.json || true + - export MLOPS_BUCKET=$(cat SEEDFARMER_MODULE_METADATA | jq -r ".MlOpsBucket") + - export SAGEMAKER_EXECUTION_ROLE=$(cat SEEDFARMER_MODULE_METADATA | jq -r ".SageMakerExecutionRole") + - export IMAGE_URI=$(cat SEEDFARMER_MODULE_METADATA | jq -r ".ImageUri") + - > + echo "MLOPS_BUCKET: ${MLOPS_BUCKET}" + - > + echo "SAGEMAKER_EXECUTION_ROLE: ${SAGEMAKER_EXECUTION_ROLE}" + - sed -i "s/AWS_REGION_NAME/${AWS_DEFAULT_REGION}/g" scripts/input.yaml + - sed -i "s/MLOPS_BUCKET/${MLOPS_BUCKET}/g" scripts/input.yaml + - export SAGEMAKER_EXECUTION_ROLE=$(echo ${SAGEMAKER_EXECUTION_ROLE} | sed -e "s#/#\\\/#g") + - sed -i "s/SAGEMAKER_EXECUTION_ROLE/${SAGEMAKER_EXECUTION_ROLE}/g" scripts/input.yaml + - export IMAGE_URI=$(echo ${IMAGE_URI} | sed -e "s#/#\\\/#g") + - sed -i "s/IMAGE_URI/${IMAGE_URI}/g" scripts/input.yaml + - sed -i "s/MODEL_NAME/${SEEDFARMER_PARAMETER_MODEL_NAME}/g" scripts/input.yaml + - tar -czf scripts/source.tar.gz -C scripts inference.py + - aws s3 cp --recursive scripts/ s3://$MLOPS_BUCKET/$SEEDFARMER_PARAMETER_MODEL_NAME/scripts/ +destroy: + phases: + install: + commands: + - npm install -g aws-cdk@2.137.0 + - pip install -r requirements.txt + build: + commands: + - export MLOPS_BUCKET=$(echo ${SEEDFARMER_MODULE_METADATA} | jq -r ".MlOpsBucket") + - aws s3 rm --recursive s3://$MLOPS_BUCKET/ + - cdk destroy --force --app "python app.py" diff --git a/modules/examples/mlops-stepfunctions/lambda/handler.py b/modules/examples/mlops-stepfunctions/lambda/handler.py new file mode 100644 index 00000000..3cd741bc --- /dev/null +++ b/modules/examples/mlops-stepfunctions/lambda/handler.py @@ -0,0 +1,32 @@ +import json +import logging +import os +import time + +import boto3 # type: ignore[import-untyped] +import yaml # type: ignore[import-untyped] + +# Configure the logger +logger = logging.getLogger() +logger.setLevel(logging.INFO) + + +def lambda_handler(event, context): # type: ignore[no-untyped-def] + logger.info(f"event: {event}") + s3 = boto3.client("s3") + + bucket_name = event["config"]["bucket"] + prefix = event["config"]["prefix"] + response = s3.get_object(Bucket=bucket_name, Key=prefix) + + yaml_data = response["Body"].read().decode("utf-8") + configuration = yaml.safe_load(yaml_data) + execution_name = f'{configuration["job_prefix"]}-{configuration["model_id"]}-{int(time.time())}' + stateMachineArn = os.environ["STATE_MACHINE_ARN"] + sfn = boto3.client("stepfunctions") + input_data = json.dumps(configuration) + input_data = input_data.replace("SFN_EXECUTION_ID", execution_name) + response = sfn.start_execution(stateMachineArn=stateMachineArn, name=execution_name, input=input_data) + logger.info(f"sfn response: {response}") + + return {"statusCode": 200, "body": json.dumps("Success")} diff --git a/modules/examples/mlops-stepfunctions/lambda/requirements.txt b/modules/examples/mlops-stepfunctions/lambda/requirements.txt new file mode 100644 index 00000000..4818cc54 --- /dev/null +++ b/modules/examples/mlops-stepfunctions/lambda/requirements.txt @@ -0,0 +1 @@ +pyyaml \ No newline at end of file diff --git a/modules/examples/mlops-stepfunctions/modulestack.yaml b/modules/examples/mlops-stepfunctions/modulestack.yaml new file mode 100644 index 00000000..d4a8f84b --- /dev/null +++ b/modules/examples/mlops-stepfunctions/modulestack.yaml @@ -0,0 +1,24 @@ +AWSTemplateFormatVersion: 2010-09-09 +Description: This template deploys a Module specific IAM permissions + +Parameters: + RoleName: + Type: String + Description: The name of the IAM Role + +Resources: + Policy: + Type: "AWS::IAM::Policy" + Properties: + PolicyDocument: + Statement: + - Action: + - "s3:Put*" + - "s3:Delete*" + - "s3:Get*" + - "s3:List*" + Effect: Allow + Resource: "*" + Version: 2012-10-17 + PolicyName: "mlops-stepfunctions-modulespecific-policy" + Roles: [!Ref RoleName] diff --git a/modules/examples/mlops-stepfunctions/pyproject.toml b/modules/examples/mlops-stepfunctions/pyproject.toml new file mode 100644 index 00000000..38fdde7a --- /dev/null +++ b/modules/examples/mlops-stepfunctions/pyproject.toml @@ -0,0 +1,45 @@ +[tool.ruff] +exclude = [ + ".eggs", + ".git", + ".hg", + ".mypy_cache", + ".ruff_cache", + ".tox", + ".venv", + ".env", + "_build", + "buck-out", + "build", + "dist", + "codeseeder.out", +] +line-length = 120 +target-version = "py38" + +[tool.ruff.lint] +select = ["F", "I", "E", "W"] +fixable = ["ALL"] + +[tool.mypy] +python_version = "3.8" +strict = true +disallow_untyped_decorators = false +exclude = "codeseeder.out/|dags/" +warn_unused_ignores = false + +plugins = [ + "pydantic.mypy" +] + +[tool.pytest.ini_options] +addopts = "-v --cov=. --cov-report term" +pythonpath = [ + "." +] + +[tool.coverage.run] +omit = ["tests/*", "dags/*"] + +[tool.coverage.report] +fail_under = 80 diff --git a/modules/examples/mlops-stepfunctions/requirements.in b/modules/examples/mlops-stepfunctions/requirements.in new file mode 100644 index 00000000..162ec72e --- /dev/null +++ b/modules/examples/mlops-stepfunctions/requirements.in @@ -0,0 +1,12 @@ +aws-cdk-lib==2.153.0 +cdk-nag==2.28.185 +boto3==1.35.1 +pydantic==2.7.4 +pydantic-settings==2.3.3 +aws-cdk-aws-lambda-python-alpha==2.153.0a0 +sagemaker==2.229.0 +pandas==2.2.2 +scikit-learn==1.5.1 +joblib-stubs==1.4.2.3.20240619 +pandas-stubs==2.2.2.240807 +types-pyyaml==6.0.12.20240808 diff --git a/modules/examples/mlops-stepfunctions/requirements.txt b/modules/examples/mlops-stepfunctions/requirements.txt new file mode 100644 index 00000000..5b131856 --- /dev/null +++ b/modules/examples/mlops-stepfunctions/requirements.txt @@ -0,0 +1,211 @@ +# +# This file is autogenerated by pip-compile with Python 3.9 +# by the following command: +# +# pip-compile --output-file=requirements.txt requirements.in +# +annotated-types==0.7.0 + # via pydantic +attrs==23.2.0 + # via + # cattrs + # jsii + # jsonschema + # referencing + # sagemaker +aws-cdk-asset-awscli-v1==2.2.202 + # via aws-cdk-lib +aws-cdk-asset-kubectl-v20==2.1.2 + # via aws-cdk-lib +aws-cdk-asset-node-proxy-agent-v6==2.0.3 + # via aws-cdk-lib +aws-cdk-aws-lambda-python-alpha==2.153.0a0 + # via -r requirements.in +aws-cdk-lib==2.153.0 + # via + # -r requirements.in + # aws-cdk-aws-lambda-python-alpha + # cdk-nag +boto3==1.35.1 + # via + # -r requirements.in + # sagemaker +botocore==1.35.1 + # via + # boto3 + # s3transfer +cattrs==22.1.0 + # via jsii +cdk-nag==2.28.185 + # via -r requirements.in +certifi==2024.7.4 + # via requests +charset-normalizer==3.3.2 + # via requests +cloudpickle==2.2.1 + # via sagemaker +constructs==10.1.37 + # via + # aws-cdk-aws-lambda-python-alpha + # aws-cdk-lib + # cdk-nag +dill==0.3.8 + # via + # multiprocess + # pathos +docker==7.1.0 + # via sagemaker +exceptiongroup==1.2.2 + # via cattrs +google-pasta==0.2.0 + # via sagemaker +idna==3.7 + # via requests +importlib-metadata==6.11.0 + # via sagemaker +importlib-resources==6.4.0 + # via jsii +jmespath==1.0.0 + # via + # boto3 + # botocore +joblib==1.4.2 + # via scikit-learn +joblib-stubs==1.4.2.3.20240619 + # via -r requirements.in +jsii==1.102.0 + # via + # aws-cdk-asset-awscli-v1 + # aws-cdk-asset-kubectl-v20 + # aws-cdk-asset-node-proxy-agent-v6 + # aws-cdk-aws-lambda-python-alpha + # aws-cdk-lib + # cdk-nag + # constructs +jsonschema==4.23.0 + # via sagemaker +jsonschema-specifications==2023.12.1 + # via jsonschema +multiprocess==0.70.16 + # via pathos +numpy==1.26.4 + # via + # pandas + # pandas-stubs + # sagemaker + # scikit-learn + # scipy +packaging==24.1 + # via sagemaker +pandas==2.2.2 + # via + # -r requirements.in + # sagemaker +pandas-stubs==2.2.2.240807 + # via -r requirements.in +pathos==0.3.2 + # via sagemaker +platformdirs==4.2.2 + # via sagemaker +pox==0.3.4 + # via pathos +ppft==1.7.6.8 + # via pathos +protobuf==4.25.4 + # via sagemaker +psutil==6.0.0 + # via sagemaker +publication==0.0.3 + # via + # aws-cdk-asset-awscli-v1 + # aws-cdk-asset-kubectl-v20 + # aws-cdk-asset-node-proxy-agent-v6 + # aws-cdk-aws-lambda-python-alpha + # aws-cdk-lib + # cdk-nag + # constructs + # jsii +pydantic==2.7.4 + # via + # -r requirements.in + # pydantic-settings +pydantic-core==2.18.4 + # via pydantic +pydantic-settings==2.3.3 + # via -r requirements.in +python-dateutil==2.8.2 + # via + # botocore + # jsii + # pandas +python-dotenv==1.0.1 + # via pydantic-settings +pytz==2024.1 + # via pandas +pyyaml==6.0.2 + # via sagemaker +referencing==0.35.1 + # via + # jsonschema + # jsonschema-specifications +requests==2.32.3 + # via + # docker + # sagemaker +rpds-py==0.20.0 + # via + # jsonschema + # referencing +s3transfer==0.10.1 + # via boto3 +sagemaker==2.229.0 + # via -r requirements.in +schema==0.7.7 + # via sagemaker +scikit-learn==1.5.1 + # via -r requirements.in +scipy==1.13.1 + # via scikit-learn +six==1.16.0 + # via + # google-pasta + # python-dateutil +smdebug-rulesconfig==1.0.1 + # via sagemaker +tblib==3.0.0 + # via sagemaker +threadpoolctl==3.5.0 + # via scikit-learn +tqdm==4.66.5 + # via sagemaker +typeguard==2.13.3 + # via + # aws-cdk-asset-awscli-v1 + # aws-cdk-asset-kubectl-v20 + # aws-cdk-asset-node-proxy-agent-v6 + # aws-cdk-aws-lambda-python-alpha + # aws-cdk-lib + # cdk-nag + # jsii +types-pytz==2024.1.0.20240417 + # via pandas-stubs +types-pyyaml==6.0.12.20240808 + # via -r requirements.in +typing-extensions==4.7.1 + # via + # joblib-stubs + # jsii + # pydantic + # pydantic-core +tzdata==2024.1 + # via pandas +urllib3==1.26.19 + # via + # botocore + # docker + # requests + # sagemaker +zipp==3.20.0 + # via + # importlib-metadata + # importlib-resources diff --git a/modules/examples/mlops-stepfunctions/scripts/evaluation.py b/modules/examples/mlops-stepfunctions/scripts/evaluation.py new file mode 100644 index 00000000..1b2f9e70 --- /dev/null +++ b/modules/examples/mlops-stepfunctions/scripts/evaluation.py @@ -0,0 +1,36 @@ +import json +import os +import tarfile + +import joblib +import pandas as pd +from sklearn.metrics import accuracy_score, classification_report, roc_auc_score # type: ignore[import-untyped] + +if __name__ == "__main__": + model_path = os.path.join("/opt/ml/processing/model", "model.tar.gz") + print("Extracting model from path: {}".format(model_path)) + with tarfile.open(model_path) as tar: + tar.extractall(path=".") + print("Loading model") + model = joblib.load("model.joblib") + + print("Loading test input data") + test_features_data = os.path.join("/opt/ml/processing/test", "test_features.csv") + test_labels_data = os.path.join("/opt/ml/processing/test", "test_labels.csv") + + X_test = pd.read_csv(test_features_data, header=None) + y_test = pd.read_csv(test_labels_data, header=None) + predictions = model.predict(X_test) + + print("Creating classification evaluation report") + report_dict = classification_report(y_test, predictions, output_dict=True) + report_dict["accuracy"] = accuracy_score(y_test, predictions) + report_dict["roc_auc"] = roc_auc_score(y_test, predictions) + + print("Classification report:\n{}".format(report_dict)) + + evaluation_output_path = os.path.join("/opt/ml/processing/evaluation", "evaluation.json") + print("Saving classification report to {}".format(evaluation_output_path)) + + with open(evaluation_output_path, "w") as f: + f.write(json.dumps(report_dict)) diff --git a/modules/examples/mlops-stepfunctions/scripts/inference.py b/modules/examples/mlops-stepfunctions/scripts/inference.py new file mode 100644 index 00000000..ec6a52bd --- /dev/null +++ b/modules/examples/mlops-stepfunctions/scripts/inference.py @@ -0,0 +1,9 @@ +import os + +import joblib + + +def model_fn(model_dir: str): # type: ignore[no-untyped-def] + print("loading model.joblib from: {}".format(model_dir)) + loaded_model = joblib.load(os.path.join(model_dir, "model.joblib")) + return loaded_model diff --git a/modules/examples/mlops-stepfunctions/scripts/input.yaml b/modules/examples/mlops-stepfunctions/scripts/input.yaml new file mode 100644 index 00000000..db1072c3 --- /dev/null +++ b/modules/examples/mlops-stepfunctions/scripts/input.yaml @@ -0,0 +1,187 @@ +app_id: aiops +model_id: MODEL_NAME +job_prefix: mlops +preprocessing: + run: true + input: + AppSpecification: + ImageUri: IMAGE_URI + ContainerEntrypoint: + ["python3", "/opt/ml/processing/code/preprocessing.py"] + ProcessingResources: + ClusterConfig: + InstanceType: ml.m5.xlarge + InstanceCount: 1 + VolumeSizeInGB: 50 + ProcessingInputs: + - InputName: input + AppManaged: false + S3Input: + S3Uri: s3://sagemaker-sample-data-AWS_REGION_NAME/processing/census + LocalPath: /opt/ml/processing/input + S3DataType: S3Prefix + S3InputMode: File + S3DataDistributionType: FullyReplicated + - InputName: Code + AppManaged: false + S3Input: + S3Uri: s3://MLOPS_BUCKET/MODEL_NAME/scripts + LocalPath: /opt/ml/processing/code + S3DataType: S3Prefix + S3InputMode: File + S3DataDistributionType: FullyReplicated + ProcessingOutputConfig: + Outputs: + - OutputName: train + AppManaged: false + S3Output: + S3Uri: s3://MLOPS_BUCKET/MODEL_NAME/processing/train + LocalPath: /opt/ml/processing/train + S3UploadMode: EndOfJob + - OutputName: test + AppManaged: false + S3Output: + S3Uri: s3://MLOPS_BUCKET/MODEL_NAME/processing/test + LocalPath: /opt/ml/processing/test + S3UploadMode: EndOfJob + StoppingCondition: + MaxRuntimeInSeconds: 3600 + AppManaged: false + Tags: + - Key: APP_ID + Value: aiops + Environment: + NetworkConfig: + RoleArn: SAGEMAKER_EXECUTION_ROLE + +training: + run: true + input: + AlgorithmSpecification: + TrainingImage: IMAGE_URI + ContainerEntrypoint: ["python3", "/opt/ml/input/data/code/train.py"] + TrainingInputMode: FastFile + HyperParameters: + ResourceConfig: + InstanceType: ml.m5.xlarge + InstanceCount: 1 + VolumeSizeInGB: 50 + InputDataConfig: + - ChannelName: training + DataSource: + S3DataSource: + S3DataType: S3Prefix + S3Uri: s3://MLOPS_BUCKET/MODEL_NAME/processing/train + S3DataDistributionType: FullyReplicated + - ChannelName: code + DataSource: + S3DataSource: + S3DataType: S3Prefix + S3Uri: s3://MLOPS_BUCKET/MODEL_NAME/scripts + S3DataDistributionType: FullyReplicated + OutputDataConfig: + S3OutputPath: s3://MLOPS_BUCKET/MODEL_NAME/model/ + StoppingCondition: + MaxRuntimeInSeconds: 3600 + Tags: + - Key: APP_ID + Value: aiops + Environment: + RetryStrategy: + VpcConfig: + RoleArn: SAGEMAKER_EXECUTION_ROLE + +evaluation: + run: true + input: + AppSpecification: + ImageUri: IMAGE_URI + ContainerEntrypoint: ["python3", "/opt/ml/processing/code/evaluation.py"] + ProcessingResources: + ClusterConfig: + InstanceType: ml.m5.xlarge + InstanceCount: 1 + VolumeSizeInGB: 50 + ProcessingInputs: + - InputName: input + AppManaged: false + S3Input: + S3Uri: s3://MLOPS_BUCKET/MODEL_NAME/model/SFN_EXECUTION_ID/output/model.tar.gz + LocalPath: /opt/ml/processing/model + S3DataType: S3Prefix + S3InputMode: File + S3DataDistributionType: FullyReplicated + - InputName: Code + AppManaged: false + S3Input: + S3Uri: s3://MLOPS_BUCKET/MODEL_NAME/scripts + LocalPath: /opt/ml/processing/code + S3DataType: S3Prefix + S3InputMode: File + S3DataDistributionType: FullyReplicated + - InputName: test + AppManaged: false + S3Input: + S3Uri: s3://MLOPS_BUCKET/MODEL_NAME/processing/test + LocalPath: /opt/ml/processing/test + S3DataType: S3Prefix + S3InputMode: File + S3DataDistributionType: FullyReplicated + ProcessingOutputConfig: + Outputs: + - OutputName: evaluation + AppManaged: false + S3Output: + S3Uri: s3://MLOPS_BUCKET/MODEL_NAME/evaluation/output + LocalPath: /opt/ml/processing/evaluation + S3UploadMode: EndOfJob + StoppingCondition: + MaxRuntimeInSeconds: 3600 + AppManaged: false + Tags: + - Key: APP_ID + Value: aiops + Environment: + NetworkConfig: + RoleArn: SAGEMAKER_EXECUTION_ROLE + +CreateModel: + run: true + input: + EnableNetworkIsolation: + Containers: + VpcConfig: + PrimaryContainer: + Image: IMAGE_URI + ModelDataUrl: s3://MLOPS_BUCKET/MODEL_NAME/model/SFN_EXECUTION_ID/output/model.tar.gz + Environment: + SAGEMAKER_PROGRAM: inference.py + SAGEMAKER_SUBMIT_DIRECTORY: s3://MLOPS_BUCKET/MODEL_NAME/scripts/source.tar.gz + + ExecutionRoleArn: SAGEMAKER_EXECUTION_ROLE + +batchTransform: + run: true + input: + BatchStrategy: MultiRecord + Environment: + APP_ID: aiops + MaxConcurrentTransforms: 2 + MaxPayloadInMB: 50 + TransformInput: + ContentType: text/csv + SplitType: Line + DataSource: + S3DataSource: + S3DataType: S3Prefix + S3Uri: s3://MLOPS_BUCKET/MODEL_NAME/processing/test/test_features.csv + TransformOutput: + Accept: text/csv + AssembleWith: Line + S3OutputPath: s3://MLOPS_BUCKET/MODEL_NAME/batch-output/SFN_EXECUTION_ID/ + TransformResources: + InstanceType: ml.m5.xlarge + InstanceCount: 1 + Tags: + - Key: APP_ID + Value: aiops diff --git a/modules/examples/mlops-stepfunctions/scripts/preprocessing.py b/modules/examples/mlops-stepfunctions/scripts/preprocessing.py new file mode 100644 index 00000000..e1246f3c --- /dev/null +++ b/modules/examples/mlops-stepfunctions/scripts/preprocessing.py @@ -0,0 +1,108 @@ +import argparse +import os +import warnings + +import numpy as np +import pandas as pd +from sklearn.compose import make_column_transformer # type: ignore[import-untyped] +from sklearn.exceptions import DataConversionWarning # type: ignore[import-untyped] +from sklearn.model_selection import train_test_split # type: ignore[import-untyped] +from sklearn.preprocessing import ( # type: ignore[import-untyped] + KBinsDiscretizer, + OneHotEncoder, + StandardScaler, +) + +warnings.filterwarnings(action="ignore", category=DataConversionWarning) + + +columns = [ + "age", + "education", + "major industry code", + "class of worker", + "num persons worked for employer", + "capital gains", + "capital losses", + "dividends from stocks", + "income", +] +class_labels = [" - 50000.", " 50000+."] + + +def print_shape(df): # type: ignore[no-untyped-def] + negative_examples, positive_examples = np.bincount(df["income"]) + print( + "Data shape: {}, {} positive examples, {} negative examples".format( + df.shape, positive_examples, negative_examples + ) + ) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--train-test-split-ratio", type=float, default=0.3) + args, _ = parser.parse_known_args() + + print("Received arguments {}".format(args)) + + input_data_path = os.path.join("/opt/ml/processing/input", "census-income.csv") + + print("Reading input data from {}".format(input_data_path)) + df = pd.read_csv(input_data_path) + df = pd.DataFrame(data=df, columns=columns) + df.dropna(inplace=True) + df.drop_duplicates(inplace=True) + df.replace(class_labels, [0, 1], inplace=True) + + negative_examples, positive_examples = np.bincount(df["income"]) + print( + "Data after cleaning: {}, {} positive examples, {} negative examples".format( + df.shape, positive_examples, negative_examples + ) + ) + + split_ratio = args.train_test_split_ratio + print("Splitting data into train and test sets with ratio {}".format(split_ratio)) + X_train, X_test, y_train, y_test = train_test_split( + df.drop("income", axis=1), df["income"], test_size=split_ratio, random_state=0 + ) + + preprocess = make_column_transformer( + ( + KBinsDiscretizer(encode="onehot-dense", n_bins=10), + ["age", "num persons worked for employer"], + ), + ( + StandardScaler(), + ["capital gains", "capital losses", "dividends from stocks"], + ), + ( + OneHotEncoder(sparse=False), + ["education", "major industry code", "class of worker"], + ), + ) + print("Running preprocessing and feature engineering transformations") + train_features = preprocess.fit_transform(X_train) + test_features = preprocess.transform(X_test) + + print("Train data shape after preprocessing: {}".format(train_features.shape)) + print("Test data shape after preprocessing: {}".format(test_features.shape)) + + train_features_output_path = os.path.join("/opt/ml/processing/train", "train_features.csv") + train_labels_output_path = os.path.join("/opt/ml/processing/train", "train_labels.csv") + + test_features_output_path = os.path.join("/opt/ml/processing/test", "test_features.csv") + test_labels_output_path = os.path.join("/opt/ml/processing/test", "test_labels.csv") + + print("Saving training features to {}".format(train_features_output_path)) + pd.DataFrame(train_features).to_csv(train_features_output_path, header=False, index=False) + + print("Saving test features to {}".format(test_features_output_path)) + pd.DataFrame(test_features).to_csv(test_features_output_path, header=False, index=False) + + print("Saving training labels to {}".format(train_labels_output_path)) + y_train.to_csv(train_labels_output_path, header=False, index=False) + + print("Saving test labels to {}".format(test_labels_output_path)) + y_test.to_csv(test_labels_output_path, header=False, index=False) diff --git a/modules/examples/mlops-stepfunctions/scripts/train.py b/modules/examples/mlops-stepfunctions/scripts/train.py new file mode 100644 index 00000000..5c809c89 --- /dev/null +++ b/modules/examples/mlops-stepfunctions/scripts/train.py @@ -0,0 +1,22 @@ +import os + +import joblib +import pandas as pd +from sklearn.linear_model import LogisticRegression # type: ignore[import-untyped] + +if __name__ == "__main__": + print("Starting training") + model_dir = "/opt/ml/model" + training_data_directory = "/opt/ml/input/data/training" + train_features_data = os.path.join(training_data_directory, "train_features.csv") + train_labels_data = os.path.join(training_data_directory, "train_labels.csv") + print("Reading input data") + X_train = pd.read_csv(train_features_data, header=None) + y_train = pd.read_csv(train_labels_data, header=None) + + model = LogisticRegression(class_weight="balanced", solver="lbfgs") + print("Training LR model") + model.fit(X_train, y_train) + model_output_directory = os.path.join(model_dir, "model.joblib") + print("Saving model to {}".format(model_output_directory)) + joblib.dump(model, model_output_directory) diff --git a/modules/examples/mlops-stepfunctions/settings.py b/modules/examples/mlops-stepfunctions/settings.py new file mode 100644 index 00000000..4aa6b6dd --- /dev/null +++ b/modules/examples/mlops-stepfunctions/settings.py @@ -0,0 +1,72 @@ +"""Defines the stack settings.""" + +from abc import ABC +from typing import Dict, Optional + +from pydantic import Field, computed_field +from pydantic_settings import BaseSettings, SettingsConfigDict + + +class CdkBaseSettings(BaseSettings, ABC): + """Defines common configuration for settings.""" + + model_config = SettingsConfigDict( + case_sensitive=False, + env_nested_delimiter="__", + protected_namespaces=(), + extra="ignore", + populate_by_name=True, + ) + + +class ModuleSettings(CdkBaseSettings): + """Seedfarmer Parameters. + + These parameters are required for the module stack. + """ + + model_config = SettingsConfigDict(env_prefix="SEEDFARMER_PARAMETER_") + + model_name: str = Field(default="demo") + schedule: str + tags: Optional[Dict[str, str]] = Field(default=None) + + +class SeedFarmerSettings(CdkBaseSettings): + """Seedfarmer Settings. + + These parameters comes from seedfarmer by default. + """ + + model_config = SettingsConfigDict(env_prefix="SEEDFARMER_") + + project_name: str = Field(default="") + deployment_name: str = Field(default="") + module_name: str = Field(default="") + + @computed_field # type: ignore + @property + def app_prefix(self) -> str: + """Application prefix.""" + prefix = "-".join([self.project_name, self.deployment_name, self.module_name]) + return prefix + + +class CDKSettings(CdkBaseSettings): + """CDK Default Settings. + + These parameters comes from AWS CDK by default. + """ + + model_config = SettingsConfigDict(env_prefix="CDK_DEFAULT_") + + account: str + region: str + + +class ApplicationSettings(CdkBaseSettings): + """Application settings.""" + + seedfarmer_settings: SeedFarmerSettings = Field(default_factory=SeedFarmerSettings) + module_settings: ModuleSettings = Field(default_factory=ModuleSettings) + cdk_settings: CDKSettings = Field(default_factory=CDKSettings) diff --git a/modules/examples/mlops-stepfunctions/stack.py b/modules/examples/mlops-stepfunctions/stack.py new file mode 100755 index 00000000..f35353b7 --- /dev/null +++ b/modules/examples/mlops-stepfunctions/stack.py @@ -0,0 +1,225 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: Apache-2.0 + +import logging +from typing import Any + +import aws_cdk.aws_events as events +import aws_cdk.aws_events_targets as events_targets +import aws_cdk.aws_iam as aws_iam +import aws_cdk.aws_s3 as aws_s3 +import aws_cdk.aws_stepfunctions as sfn +from aws_cdk import Aws, Duration, RemovalPolicy, Stack +from aws_cdk.aws_lambda import Runtime +from aws_cdk.aws_lambda_python_alpha import PythonFunction +from cdk_nag import NagPackSuppression, NagSuppressions +from constructs import Construct + +_logger: logging.Logger = logging.getLogger(__name__) + + +class MLOPSSFNResources(Stack): + def __init__( + self, + scope: Construct, + id: str, + *, + project_name: str, + deployment_name: str, + module_name: str, + model_name: str, + schedule: str, + **kwargs: Any, + ) -> None: + # MLOPS Env vars + self.deployment_name = deployment_name + self.module_name = module_name + + super().__init__( + scope, + id, + description="This stack deploys Example DAGs resources for MLOps", + **kwargs, + ) + account: str = Aws.ACCOUNT_ID + region: str = Aws.REGION + + mlops_assets_bucket = aws_s3.Bucket( + self, + id="mlops-sfn-assets-bucket", + versioned=False, + # bucket_name=f"{dep_mod}-{account}-{region}", + removal_policy=RemovalPolicy.DESTROY, + encryption=aws_s3.BucketEncryption.KMS_MANAGED, + block_public_access=aws_s3.BlockPublicAccess.BLOCK_ALL, + enforce_ssl=True, + ) + + self.mlops_assets_bucket = mlops_assets_bucket + + # Create Dag IAM Role and policy + s3_access_statements = aws_iam.PolicyDocument( + statements=[ + aws_iam.PolicyStatement( + actions=["s3:List*", "s3:Get*"], + effect=aws_iam.Effect.ALLOW, + resources=[ + mlops_assets_bucket.bucket_arn, + f"{mlops_assets_bucket.bucket_arn}/*", + ], + ) + ] + ) + + # create a role for lambda function + lambda_role = aws_iam.Role( + self, + "LambdaRole", + assumed_by=aws_iam.ServicePrincipal("lambda.amazonaws.com"), + managed_policies=[ + aws_iam.ManagedPolicy.from_aws_managed_policy_name("service-role/AWSLambdaBasicExecutionRole"), + ], + # role_name=r_name, + path="/", + ) + + # Create the Step Functions Execution Role + sfn_rule_statements = aws_iam.PolicyDocument( + statements=[ + aws_iam.PolicyStatement( + actions=[ + "events:PutTargets", + "events:PutRule", + "events:DescribeRule", + ], + effect=aws_iam.Effect.ALLOW, + resources=[f"arn:aws:events:{region}:{account}:rule/StepFunctions*"], + ) + ] + ) + + sfn_exec_role = aws_iam.Role( + self, + "StepFunctionsExecutionRole", + assumed_by=aws_iam.ServicePrincipal("states.amazonaws.com"), + managed_policies=[ + aws_iam.ManagedPolicy.from_aws_managed_policy_name("service-role/AWSLambdaRole"), + aws_iam.ManagedPolicy.from_aws_managed_policy_name("AmazonSageMakerFullAccess"), + aws_iam.ManagedPolicy.from_aws_managed_policy_name("AmazonS3ReadOnlyAccess"), + ], + inline_policies={ + "SfnSMRulePolicy": sfn_rule_statements, + }, + ) + # sfn_exec_role.add_managed_policy(sfn_rule_policy) + + self.sfn_exec_role = sfn_exec_role + + # Define the IAM role + sagemaker_execution_role = aws_iam.Role( + self, + "SageMakerExecutionRole", + assumed_by=aws_iam.ServicePrincipal("sagemaker.amazonaws.com"), + managed_policies=[aws_iam.ManagedPolicy.from_aws_managed_policy_name("AmazonSageMakerFullAccess")], + path="/", + # role_name=f"SageMakerExecutionRole-{self.stack_name}", + ) + + # Add policy to allow access to S3 bucket and IAM pass role + mlops_assets_bucket.grant_read_write(sagemaker_execution_role) + mlops_assets_bucket.grant_read(sfn_exec_role) + sagemaker_execution_role.grant_pass_role(sfn_exec_role) + + self.sagemaker_execution_role = sagemaker_execution_role + # self.state_machine_arn = sagemaker_execution_role + + # Create the Step Functions State Machine + state_machine = sfn.StateMachine( + self, + "StateMachine", + definition_body=sfn.DefinitionBody.from_file("./state_machine.json"), + state_machine_type=sfn.StateMachineType.STANDARD, + role=sfn_exec_role, + ) + self.state_machine_arn = state_machine.state_machine_arn + + sfn_execution_for_lambda = aws_iam.PolicyDocument( + statements=[ + aws_iam.PolicyStatement( + actions=["states:StartExecution"], + effect=aws_iam.Effect.ALLOW, + resources=[state_machine.state_machine_arn], + ) + ] + ) + + # create a lambda function with python runtime and install dependencies from requirementes with docker + lambda_function = PythonFunction( + self, + "LambdaFunction", + entry="lambda", + runtime=Runtime.PYTHON_3_12, + index="handler.py", + handler="lambda_handler", + role=lambda_role, + environment={"STATE_MACHINE_ARN": state_machine.state_machine_arn}, + timeout=Duration.seconds(60), + ) + self.lambda_function_arn = lambda_function.function_arn + + lambda_role.attach_inline_policy(aws_iam.Policy(self, "SFNExecutionPolicy", document=sfn_execution_for_lambda)) + lambda_role.attach_inline_policy(aws_iam.Policy(self, "S3AccessRole", document=s3_access_statements)) + + # Create the EventBridge rule + + event_rule = events.Rule( + self, + "MyEventRule", + schedule=events.Schedule.expression(f"cron({schedule})"), + ) + # Define the custom input as an event + + custom_input = { + "config": { + "bucket": mlops_assets_bucket.bucket_name, + "prefix": f"{model_name}/scripts/input.yaml", + } + # Add more key-value pairs as needed + } + event_rule.add_target( + events_targets.LambdaFunction( + lambda_function, + event=events.RuleTargetInput.from_object(custom_input), + ) + ) + + NagSuppressions.add_resource_suppressions( + self, + apply_to_children=True, + suppressions=[ + NagPackSuppression( + id="AwsSolutions-S1", + reason="Logs are disabled for demo purposes", + ), + NagPackSuppression( + id="AwsSolutions-S5", + reason="No OAI needed - no one is accessing this data without explicit permissions", + ), + NagPackSuppression( + id="AwsSolutions-IAM5", + reason="Resource access restricted to MLOPS resources.", + ), + NagPackSuppression( + id="AwsSolutions-IAM4", + reason="Managed Policies are for service account roles only", + ), + NagPackSuppression( + id="AwsSolutions-SF1", + reason="Logs are disabled for demo purposes", + ), + NagPackSuppression( + id="AwsSolutions-SF2", + reason="X-Ray is disabled for demo purposes", + ), + ], + ) diff --git a/modules/examples/mlops-stepfunctions/state_machine.json b/modules/examples/mlops-stepfunctions/state_machine.json new file mode 100644 index 00000000..1bb40c62 --- /dev/null +++ b/modules/examples/mlops-stepfunctions/state_machine.json @@ -0,0 +1,152 @@ +{ + "StartAt": "CheckPreProcessingRun", + "States": { + "CheckPreProcessingRun": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.preprocessing.run", + "BooleanEquals": true, + "Next": "PreProcessingJob" + } + ], + "Default": "CheckTrainingRun" + }, + "PreProcessingJob": { + "Type": "Task", + "Resource": "arn:aws:states:::sagemaker:createProcessingJob.sync", + "Parameters": { + "AppSpecification.$": "$.preprocessing.input.AppSpecification", + "Environment.$": "$.preprocessing.input.Environment", + "NetworkConfig.$": "$.preprocessing.input.NetworkConfig", + "ProcessingInputs.$": "$.preprocessing.input.ProcessingInputs", + "ProcessingOutputConfig.$": "$.preprocessing.input.ProcessingOutputConfig", + "ProcessingResources.$": "$.preprocessing.input.ProcessingResources", + "RoleArn.$": "$.preprocessing.input.RoleArn", + "StoppingCondition.$": "$.preprocessing.input.StoppingCondition", + "Tags.$": "$.preprocessing.input.Tags", + "ProcessingJobName.$": "$$.Execution.Name" + }, + "ResultPath": "$.preprocessing.result", + "Next": "CheckTrainingRun" + }, + "CheckTrainingRun": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.training.run", + "BooleanEquals": true, + "Next": "TrainingJob" + } + ], + "Default": "CheckEvaluationRun" + }, + "TrainingJob": { + "Type": "Task", + "Resource": "arn:aws:states:::sagemaker:createTrainingJob.sync", + "ResultPath": "$.training.result", + "Parameters": { + "AlgorithmSpecification.$": "$.training.input.AlgorithmSpecification", + "HyperParameters.$": "$.training.input.HyperParameters", + "InputDataConfig.$": "$.training.input.InputDataConfig", + "OutputDataConfig.$": "$.training.input.OutputDataConfig", + "ResourceConfig.$": "$.training.input.ResourceConfig", + "RoleArn.$": "$.training.input.RoleArn", + "Environment.$": "$.training.input.Environment", + "StoppingCondition.$": "$.training.input.StoppingCondition", + "Tags.$": "$.training.input.Tags", + "RetryStrategy.$": "$.training.input.RetryStrategy", + "VpcConfig.$": "$.training.input.VpcConfig", + "TrainingJobName.$": "$$.Execution.Name" + }, + "Next": "CheckEvaluationRun" + }, + "CheckEvaluationRun": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.evaluation.run", + "BooleanEquals": true, + "Next": "EvaluationJob" + } + ], + "Default": "CreateModelCheck" + }, + "EvaluationJob": { + "Type": "Task", + "Resource": "arn:aws:states:::sagemaker:createProcessingJob.sync", + "ResultPath": "$.evaluation.result", + "Parameters": { + "AppSpecification.$": "$.evaluation.input.AppSpecification", + "Environment.$": "$.evaluation.input.Environment", + "NetworkConfig.$": "$.evaluation.input.NetworkConfig", + "ProcessingInputs.$": "$.evaluation.input.ProcessingInputs", + "ProcessingOutputConfig.$": "$.evaluation.input.ProcessingOutputConfig", + "ProcessingResources.$": "$.evaluation.input.ProcessingResources", + "RoleArn.$": "$.evaluation.input.RoleArn", + "StoppingCondition.$": "$.evaluation.input.StoppingCondition", + "Tags.$": "$.evaluation.input.Tags", + "ProcessingJobName.$": "States.Format('evaluation-{}', $$.Execution.Name)" + }, + "Next": "CreateModelCheck" + }, + "CreateModelCheck": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.CreateModel.run", + "BooleanEquals": true, + "Next": "CreateModelTask" + } + ], + "Default": "CheckBatchTransformRun" + }, + "CreateModelTask": { + "Parameters": { + "Containers.$": "$.CreateModel.input.Containers", + "EnableNetworkIsolation.$": "$.CreateModel.input.EnableNetworkIsolation", + "PrimaryContainer.$": "$.CreateModel.input.PrimaryContainer", + "VpcConfig.$": "$.CreateModel.input.VpcConfig", + "ExecutionRoleArn.$": "$.CreateModel.input.ExecutionRoleArn", + "ModelName.$": "States.Format('model-{}', $$.Execution.Name)" + }, + "Resource": "arn:aws:states:::sagemaker:createModel", + "Type": "Task", + "ResultPath": "$.CreateModel.result", + "Next": "CheckBatchTransformRun" + }, + "CheckBatchTransformRun": { + "Type": "Choice", + "Choices": [ + { + "Variable": "$.batchTransform.run", + "BooleanEquals": true, + "Next": "BatchTransformJob" + } + ], + "Default": "Pass" + }, + "BatchTransformJob": { + "Type": "Task", + "Resource": "arn:aws:states:::sagemaker:createTransformJob.sync", + "ResultPath": "$.batchTransform.result", + "Parameters": { + "ModelName.$": "States.Format('model-{}', $$.Execution.Name)", + "BatchStrategy.$": "$.batchTransform.input.BatchStrategy", + "Environment.$": "$.batchTransform.input.Environment", + "MaxConcurrentTransforms.$": "$.batchTransform.input.MaxConcurrentTransforms", + "MaxPayloadInMB.$": "$.batchTransform.input.MaxPayloadInMB ", + "Tags.$": "$.batchTransform.input.Tags ", + "TransformInput.$": "$.batchTransform.input.TransformInput", + "TransformOutput.$": "$.batchTransform.input.TransformOutput", + "TransformResources.$": "$.batchTransform.input.TransformResources", + "TransformJobName.$": "States.Format('batch-{}', $$.Execution.Name)" + }, + "Next": "Pass" + }, + "Pass": { + "Type": "Pass", + "End": true + } + } +} \ No newline at end of file diff --git a/modules/examples/mlops-stepfunctions/tests/__init__.py b/modules/examples/mlops-stepfunctions/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/modules/examples/mlops-stepfunctions/tests/test_app.py b/modules/examples/mlops-stepfunctions/tests/test_app.py new file mode 100644 index 00000000..8c17937e --- /dev/null +++ b/modules/examples/mlops-stepfunctions/tests/test_app.py @@ -0,0 +1,26 @@ +import os +import sys + +import pytest + + +@pytest.fixture(scope="function") +def stack_defaults() -> None: + os.environ["SEEDFARMER_PROJECT_NAME"] = "test-project" + os.environ["SEEDFARMER_DEPLOYMENT_NAME"] = "test-deployment" + os.environ["SEEDFARMER_MODULE_NAME"] = "test-module" + os.environ["CDK_DEFAULT_ACCOUNT"] = "111111111111" + os.environ["CDK_DEFAULT_REGION"] = "us-east-1" + + os.environ["SEEDFARMER_PARAMETER_BUCKET_POLICY_ARN"] = "12345" + os.environ["SEEDFARMER_PERMISSION_BOUNDARY_ARN"] = "sagemaker-project" + + os.environ["SEEDFARMER_PARAMETER_SCHEDULE"] = "0 6 * * ? *" + + # Unload the app import so that subsequent tests don't reuse + if "app" in sys.modules: + del sys.modules["app"] + + +def test_app(stack_defaults): # type: ignore[no-untyped-def] + import app # noqa: F401 diff --git a/modules/examples/mlops-stepfunctions/tests/test_stack.py b/modules/examples/mlops-stepfunctions/tests/test_stack.py new file mode 100644 index 00000000..e11b4e7f --- /dev/null +++ b/modules/examples/mlops-stepfunctions/tests/test_stack.py @@ -0,0 +1,64 @@ +import os +import sys + +import aws_cdk as cdk +import cdk_nag +import pytest +from aws_cdk.assertions import Annotations, Match, Template + + +@pytest.fixture(scope="function") +def stack_defaults() -> None: + os.environ["CDK_DEFAULT_ACCOUNT"] = "111111111111" + os.environ["CDK_DEFAULT_REGION"] = "us-east-1" + + # Unload the app import so that subsequent tests don't reuse + if "stack" in sys.modules: + del sys.modules["stack"] + + +@pytest.fixture(scope="function") +def stack_model_package_input() -> cdk.Stack: + import stack + + app = cdk.App() + + project_name = "test-project" + deployment_name = "test-deployment" + module_name = "test-module" + + app_prefix = f"{project_name}-{deployment_name}-{module_name}" + + return stack.MLOPSSFNResources( + scope=app, + id=app_prefix, + project_name=project_name, + deployment_name=deployment_name, + module_name=module_name, + model_name="demo", + schedule="0 6 * * ? *", + env=cdk.Environment( + account=os.environ["CDK_DEFAULT_ACCOUNT"], + region=os.environ["CDK_DEFAULT_REGION"], + ), + ) + + +@pytest.fixture(params=["stack_model_package_input"], scope="function") +def stack(request, stack_model_package_input) -> cdk.Stack: # type: ignore[no-untyped-def] + return request.getfixturevalue(request.param) # type: ignore[no-any-return] + + +def test_synthesize_stack(stack: cdk.Stack) -> None: + template = Template.from_stack(stack) + template.resource_count_is("AWS::S3::Bucket", 1) + + +def test_no_cdk_nag_errors(stack: cdk.Stack) -> None: + cdk.Aspects.of(stack).add(cdk_nag.AwsSolutionsChecks()) + + nag_errors = Annotations.from_stack(stack).find_error( + "*", + Match.string_like_regexp(r"AwsSolutions-.*"), + ) + assert not nag_errors, f"Found {len(nag_errors)} CDK nag errors"