Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
knakazawa99 authored Mar 17, 2024
2 parents 705ffe4 + 5fc8a3a commit 993076b
Show file tree
Hide file tree
Showing 9 changed files with 391 additions and 70 deletions.
2 changes: 1 addition & 1 deletion docs/gen_examples_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def get_code_examples(obj: Union[ModuleType, Callable]) -> Set[str]:
for section in parsed_sections:
if section.kind == DocstringSectionKind.examples:
code_example = "\n".join(
(part[1] for part in section.as_dict().get("value", []))
part[1] for part in section.as_dict().get("value", [])
)
if not skip_block_load_code_example(code_example):
code_examples.add(code_example)
Expand Down
4 changes: 2 additions & 2 deletions prefect_aws/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ async def batch_submit(
Args:
job_name: The AWS batch job name.
job_definition: The AWS batch job definition.
job_queue: Name of the AWS batch job queue.
job_definition: The AWS batch job definition.
aws_credentials: Credentials to use for authentication with AWS.
**batch_kwargs: Additional keyword arguments to pass to the boto3
`submit_job` function. See the documentation for
Expand All @@ -49,8 +49,8 @@ def example_batch_submit_flow():
)
job_id = batch_submit(
"job_name",
"job_definition",
"job_queue",
"job_definition",
aws_credentials
)
return job_id
Expand Down
38 changes: 37 additions & 1 deletion prefect_aws/ecs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
"""
DEPRECATION WARNING:
This module is deprecated as of March 2024 and will not be available after September 2024.
It has been replaced by the ECS worker, which offers enhanced functionality and better performance.
For upgrade instructions, see https://docs.prefect.io/latest/guides/upgrade-guide-agents-to-workers/.
Integrations with the Amazon Elastic Container Service.
Examples:
Expand Down Expand Up @@ -102,7 +109,8 @@
],
)
```
"""
""" # noqa

import copy
import difflib
import json
Expand All @@ -118,6 +126,7 @@
import yaml
from anyio.abc import TaskStatus
from jsonpointer import JsonPointerException
from prefect._internal.compatibility.deprecated import deprecated_class
from prefect.blocks.core import BlockNotSavedError
from prefect.exceptions import InfrastructureNotAvailable, InfrastructureNotFound
from prefect.infrastructure.base import Infrastructure, InfrastructureResult
Expand All @@ -126,6 +135,8 @@
from prefect.utilities.pydantic import JsonPatch
from pydantic import VERSION as PYDANTIC_VERSION

from prefect_aws.utilities import assemble_document_for_patches

if PYDANTIC_VERSION.startswith("2."):
from pydantic.v1 import Field, root_validator, validator
else:
Expand Down Expand Up @@ -203,6 +214,14 @@ def _pretty_diff(d1: dict, d2: dict) -> str:
)


@deprecated_class(
start_date="Mar 2024",
help=(
"Use the ECS worker instead."
" Refer to the upgrade guide for more information:"
" https://docs.prefect.io/latest/guides/upgrade-guide-agents-to-workers/."
),
)
class ECSTask(Infrastructure):
"""
Run a command as an ECS task.
Expand Down Expand Up @@ -739,6 +758,23 @@ async def generate_work_pool_base_job_template(self) -> dict:
)

if self.task_customizations:
network_config_patches = JsonPatch(
[
patch
for patch in self.task_customizations
if "networkConfiguration" in patch["path"]
]
)
minimal_network_config = assemble_document_for_patches(
network_config_patches
)
if minimal_network_config:
minimal_network_config_with_patches = network_config_patches.apply(
minimal_network_config
)
base_job_template["variables"]["properties"]["network_configuration"][
"default"
] = minimal_network_config_with_patches["networkConfiguration"]
try:
base_job_template["job_configuration"]["task_run_request"] = (
self.task_customizations.apply(
Expand Down
81 changes: 81 additions & 0 deletions prefect_aws/utilities.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Utilities for working with AWS services."""

from typing import Dict, List, Union

from prefect.utilities.collections import visit_collection


Expand Down Expand Up @@ -33,3 +35,82 @@ def make_hashable(item):
collection, visit_fn=make_hashable, return_data=True
)
return hash(hashable_collection)


def ensure_path_exists(doc: Union[Dict, List], path: List[str]):
"""
Ensures the path exists in the document, creating empty dictionaries or lists as
needed.
Args:
doc: The current level of the document or sub-document.
path: The remaining path parts to ensure exist.
"""
if not path:
return
current_path = path.pop(0)
# Check if the next path part exists and is a digit
next_path_is_digit = path and path[0].isdigit()

# Determine if the current path is for an array or an object
if isinstance(doc, list): # Path is for an array index
current_path = int(current_path)
# Ensure the current level of the document is a list and long enough

while len(doc) <= current_path:
doc.append({})
next_level = doc[current_path]
else: # Path is for an object
if current_path not in doc or (
next_path_is_digit and not isinstance(doc.get(current_path), list)
):
doc[current_path] = [] if next_path_is_digit else {}
next_level = doc[current_path]

ensure_path_exists(next_level, path)


def assemble_document_for_patches(patches):
"""
Assembles an initial document that can successfully accept the given JSON Patch
operations.
Args:
patches: A list of JSON Patch operations.
Returns:
An initial document structured to accept the patches.
Example:
```python
patches = [
{"op": "replace", "path": "/name", "value": "Jane"},
{"op": "add", "path": "/contact/address", "value": "123 Main St"},
{"op": "remove", "path": "/age"}
]
initial_document = assemble_document_for_patches(patches)
#output
{
"name": {},
"contact": {},
"age": {}
}
```
"""
document = {}

for patch in patches:
operation = patch["op"]
path = patch["path"].lstrip("/").split("/")

if operation == "add":
# Ensure all but the last element of the path exists
ensure_path_exists(document, path[:-1])
elif operation in ["remove", "replace"]:
# For remove adn replace, the entire path should exist
ensure_path_exists(document, path)

return document
Loading

0 comments on commit 993076b

Please sign in to comment.