Skip to content

Commit

Permalink
Merge branch 'PrefectHQ:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
gnoluna authored Jan 4, 2024
2 parents a99e6a9 + 50d5f63 commit c27b2c1
Show file tree
Hide file tree
Showing 20 changed files with 855 additions and 29 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: 3.7
python-version: 3.8

- name: Install packages
run: |
Expand Down
52 changes: 52 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,58 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Removed

## 0.4.7

Released January 3rd, 2024.

### Added

- `LambdaFunction` block to invoke lambda functions - [#355](https://github.com/PrefectHQ/prefect-aws/pull/355)

### Fixed

- Bug where `S3Bucket.load()` constructed `AwsCredentials` instead of `MinIOCredentials` - [#359](https://github.com/PrefectHQ/prefect-aws/pull/359)

## 0.4.6

Released December 11th, 2023.

### Added

Ability to publish `ECSTask`` block as an ecs work pool - [#353](https://github.com/PrefectHQ/prefect-aws/pull/353)

## 0.4.5

Released November 30th, 2023.

### Fixed

- Bug where Prefect API key provided to ECS tasks was masked - [#347](https://github.com/PrefectHQ/prefect-aws/pull/347)

## 0.4.4

Released November 29th, 2023.

### Changed

- Mask Prefect API key in logs - [#341](https://github.com/PrefectHQ/prefect-aws/pull/341)

## 0.4.3

Released November 13th, 2023.

### Added

- `SecretBrinary` suport to `AwsSecret` block - [#274](https://github.com/PrefectHQ/prefect-aws/pull/274)

## 0.4.2

Released November 6th, 2023.

### Fixed

- Fixed use_ssl default for s3 client.

## 0.4.1

Released October 13th, 2023.
Expand Down
24 changes: 8 additions & 16 deletions docs/ecs_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ ECS (Elastic Container Service) tasks are a good option for executing Prefect 2

## ECS Flow Run Execution

Prefect enables remote flow execution via [workers](https://docs.prefect.io/2.11.1/concepts/work-pools/#worker-overview) and [work pools](https://docs.prefect.io/2.11.1/concepts/work-pools/#work-pool-overview). To learn more about these concepts please see our [deployment tutorial](https://docs.prefect.io/2.11.1/tutorial/deployments/).
Prefect enables remote flow execution via [workers](https://docs.prefect.io/concepts/work-pools/#worker-overview) and [work pools](https://docs.prefect.io/concepts/work-pools/#work-pool-overview). To learn more about these concepts please see our [deployment tutorial](https://docs.prefect.io/tutorial/deployments/).

For details on how workers and work pools are implemented for ECS, see the diagram below:
#### Architecture Diagram
Expand Down Expand Up @@ -182,22 +182,14 @@ To create an [IAM role](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_role
"containerDefinitions": [
{
"name": "prefect-worker",
"image": "prefecthq/prefect",
"image": "prefecthq/prefect:2-latest",
"cpu": 512,
"memory": 1024,
"essential": true,
"command": [
"pip",
"install",
"prefect-aws",
"&&",
"prefect",
"worker",
"start",
"--pool",
"my-ecs-pool",
"--type",
"ecs"
"/bin/sh",
"-c",
"pip install prefect-aws && prefect worker start --pool my-ecs-pool --type ecs"
],
"environment": [
{
Expand All @@ -218,7 +210,7 @@ To create an [IAM role](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_role

- For the `PREFECT_API_KEY`, individuals on the organization tier can create a [service account](https://docs.prefect.io/latest/cloud/users/service-accounts/) for the worker. If on a personal tier, you can pass a user’s API key.

- Replace `<your-ecs-task-role-arn>` with the ARN of the IAM role you created in Step 1.
- Replace both instances of `<your-ecs-task-role-arn>` with the ARN of the IAM role you created in Step 2.

- Notice that the CPU and Memory allocations are relatively small. The worker's main responsibility is to submit work through API calls to AWS, _not_ to execute your Prefect flow code.

Expand Down Expand Up @@ -296,10 +288,10 @@ To create an [IAM role](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_role
- Do your flow runs require higher `CPU`?
- Would an EC2 `Launch Type` speed up your flow run execution?

These infrastructure configuration values can be set on your ECS work pool or they can be overridden on the deployment level through [job_variables](https://docs.prefect.io/2.11.0/concepts/infrastructure/#kubernetesjob-overrides-and-customizations) if desired.
These infrastructure configuration values can be set on your ECS work pool or they can be overridden on the deployment level through [job_variables](https://docs.prefect.io/concepts/infrastructure/#kubernetesjob-overrides-and-customizations) if desired.


2. Consider adding a [build action](https://docs.prefect.io/2.11.0/concepts/deployments-ux/#the-build-action) to your Prefect Project [`prefect.yaml`](https://docs.prefect.io/2.11.0/concepts/deployments-ux/#the-prefect-yaml-file) if you want to automatically build a Docker image and push it to an image registry `prefect deploy` is run.
2. Consider adding a [build action](https://docs.prefect.io/concepts/deployments-ux/#the-build-action) to your Prefect Project [`prefect.yaml`](https://docs.prefect.io/concepts/deployments-ux/#the-prefect-yaml-file) if you want to automatically build a Docker image and push it to an image registry `prefect deploy` is run.

Here is an example build action for ECR:
```yaml
Expand Down
6 changes: 6 additions & 0 deletions docs/lambda_function.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
description: Module handling AWS Lambda functions
notes: This documentation page is generated from source file docstrings.
---

::: prefect_aws.lambda_function
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ nav:
- Credentials: credentials.md
- ECS Worker: ecs_worker.md
- ECS: ecs.md
- Lambda: lambda_function.md
- Deployments:
- Steps: deployments/steps.md
- S3: s3.md
Expand Down
2 changes: 2 additions & 0 deletions prefect_aws/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from . import _version
from .credentials import AwsCredentials, MinIOCredentials
from .client_parameters import AwsClientParameters
from .lambda_function import LambdaFunction
from .s3 import S3Bucket
from .ecs import ECSTask
from .secrets_manager import AwsSecret
Expand All @@ -17,6 +18,7 @@
__all__ = [
"AwsCredentials",
"AwsClientParameters",
"LambdaFunction",
"MinIOCredentials",
"S3Bucket",
"ECSTask",
Expand Down
2 changes: 1 addition & 1 deletion prefect_aws/deployments/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ def get_s3_client(
aws_client_parameters = credentials.get("aws_client_parameters", client_parameters)
api_version = aws_client_parameters.get("api_version", None)
endpoint_url = aws_client_parameters.get("endpoint_url", None)
use_ssl = aws_client_parameters.get("use_ssl", None)
use_ssl = aws_client_parameters.get("use_ssl", True)
verify = aws_client_parameters.get("verify", None)
config_params = aws_client_parameters.get("config", {})
config = Config(**config_params)
Expand Down
74 changes: 73 additions & 1 deletion prefect_aws/ecs.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
import json
import logging
import pprint
import shlex
import sys
import time
import warnings
Expand All @@ -116,6 +117,8 @@
import boto3
import yaml
from anyio.abc import TaskStatus
from jsonpointer import JsonPointerException
from prefect.blocks.core import BlockNotSavedError
from prefect.exceptions import InfrastructureNotAvailable, InfrastructureNotFound
from prefect.infrastructure.base import Infrastructure, InfrastructureResult
from prefect.utilities.asyncutils import run_sync_in_worker_thread, sync_compatible
Expand All @@ -132,7 +135,7 @@
from typing_extensions import Literal, Self

from prefect_aws import AwsCredentials
from prefect_aws.workers.ecs_worker import _TAG_REGEX
from prefect_aws.workers.ecs_worker import _TAG_REGEX, ECSWorker

# Internal type alias for ECS clients which are generated dynamically in botocore
_ECSClient = Any
Expand Down Expand Up @@ -681,6 +684,75 @@ async def kill(self, identifier: str, grace_seconds: int = 30) -> None:
cluster, task = parse_task_identifier(identifier)
await run_sync_in_worker_thread(self._stop_task, cluster, task)

@staticmethod
def get_corresponding_worker_type() -> str:
"""Return the corresponding worker type for this infrastructure block."""
return ECSWorker.type

async def generate_work_pool_base_job_template(self) -> dict:
"""
Generate a base job template for a cloud-run work pool with the same
configuration as this block.
Returns:
- dict: a base job template for a cloud-run work pool
"""
base_job_template = copy.deepcopy(ECSWorker.get_default_base_job_template())
for key, value in self.dict(exclude_unset=True, exclude_defaults=True).items():
if key == "command":
base_job_template["variables"]["properties"]["command"]["default"] = (
shlex.join(value)
)
elif key in [
"type",
"block_type_slug",
"_block_document_id",
"_block_document_name",
"_is_anonymous",
"task_customizations",
]:
continue
elif key == "aws_credentials":
if not self.aws_credentials._block_document_id:
raise BlockNotSavedError(
"It looks like you are trying to use a block that"
" has not been saved. Please call `.save` on your block"
" before publishing it as a work pool."
)
base_job_template["variables"]["properties"]["aws_credentials"][
"default"
] = {
"$ref": {
"block_document_id": str(
self.aws_credentials._block_document_id
)
}
}
elif key == "task_definition":
base_job_template["job_configuration"]["task_definition"] = value
elif key in base_job_template["variables"]["properties"]:
base_job_template["variables"]["properties"][key]["default"] = value
else:
self.logger.warning(
f"Variable {key!r} is not supported by Cloud Run work pools."
" Skipping."
)

if self.task_customizations:
try:
base_job_template["job_configuration"]["task_run_request"] = (
self.task_customizations.apply(
base_job_template["job_configuration"]["task_run_request"]
)
)
except JsonPointerException:
self.logger.warning(
"Unable to apply task customizations to the base job template."
"You may need to update the template manually."
)

return base_job_template

def _stop_task(self, cluster: str, task: str) -> None:
"""
Stop a running ECS task.
Expand Down
Loading

0 comments on commit c27b2c1

Please sign in to comment.