This repository has been archived by the owner on Apr 26, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 40
Feature Implementation: AWS Glue Job Execution Support #308
Merged
Merged
Changes from 2 commits
Commits
Show all changes
45 commits
Select commit
Hold shift + click to select a range
1f82b74
Add Glue Job
knakazawa99 4fb7a2c
Add Glue Job Doc
knakazawa99 8f9e9af
Fix Example of PyDoc
knakazawa99 9d6ac4f
Add pyparsing
knakazawa99 5af0057
Add Glue Job Worker
knakazawa99 5c46568
Change mkdocs
knakazawa99 7adaf5b
Add region to mock_aws_credentials
knakazawa99 d2183db
Add aws_default_region to mock_aws_credentials
knakazawa99 71552b9
Fix region name for glue job client test
knakazawa99 f44c66f
Merge branch 'main' into main
knakazawa99 956c7da
Merge branch 'main' into main
desertaxle c726304
Add Class Doc
knakazawa99 bae5e12
add pydantic v2 support
knakazawa99 71934c2
Remove optional type hint
knakazawa99 e71b947
Change dict type hint
knakazawa99 68b02b8
Add module comment
knakazawa99 3b501e2
Support pydantic v2
knakazawa99 1dceb73
Change region name
knakazawa99 8390a94
use aws_credentials in fixture
knakazawa99 6b3ff0b
use aws_credentials in test functions
knakazawa99 c0d611f
Merge branch 'main' into main
knakazawa99 3bd4a17
Remove Glue Job Worker
knakazawa99 7ea43d4
Remove Glue Job inherited from Infrastructure, Implement Glue Job Brock
knakazawa99 491191e
Merge remote-tracking branch 'origin/main'
knakazawa99 9e06097
Remove Glue Job Worker From Docs
knakazawa99 12299d4
Merge branch 'main' into main
knakazawa99 41367dd
Add aws_credentials to test_get_client
knakazawa99 0d04479
Merge remote-tracking branch 'origin/main'
knakazawa99 cb36769
Add aws_credentials to test_get_client
knakazawa99 7b9cbad
remove aws_credentials from test_get_client
knakazawa99 32a43c0
fix: ref https://github.com/PrefectHQ/prefect-aws/pull/308/commits/6b…
knakazawa99 18e71d7
Merge branch 'main' into main
desertaxle bd69c20
Merge branch 'main' into main
desertaxle 3bf96cb
Updates BaseModel import
desertaxle f086f03
Fixes syntax
desertaxle 7048c2a
Merge branch 'main' into main
desertaxle f3c7d2c
Refactor: Moved responsibilities of JobRun to JobBlock
knakazawa99 705ffe4
Merge remote-tracking branch 'origin/main'
knakazawa99 993076b
Merge branch 'main' into main
knakazawa99 d92238d
fix degrade
knakazawa99 5a4fd2d
fix for ut
knakazawa99 12781be
fix for ut
knakazawa99 d5c37e2
fix for ut
knakazawa99 9f71f03
Fix for generated doc
knakazawa99 474c045
Merge branch 'main' into main
desertaxle File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
--- | ||
description: Tasks for interacting with AWS Glue Job | ||
notes: This documentation page is generated from source file docstrings. | ||
--- | ||
|
||
::: prefect_aws.glue_job |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,183 @@ | ||
""" | ||
Integrations with the AWS Glue Job. | ||
|
||
""" | ||
import time | ||
from typing import Any, Optional | ||
|
||
from prefect.infrastructure.base import Infrastructure, InfrastructureResult | ||
from prefect.utilities.asyncutils import run_sync_in_worker_thread, sync_compatible | ||
from pydantic import Field | ||
from typing_extensions import Literal | ||
|
||
from prefect_aws import AwsCredentials | ||
|
||
_GlueJobClient = Any | ||
|
||
|
||
class GlueJobResult(InfrastructureResult): | ||
"""The result of a run of a Glue Job""" | ||
|
||
|
||
class GlueJob(Infrastructure): | ||
""" | ||
Execute a job to the AWS Glue Job service. | ||
|
||
Attributes: | ||
job_name: The name of the job definition to use. | ||
arguments: The job arguments associated with this run. | ||
For this job run, they replace the default arguments set in the job | ||
definition itself. | ||
You can specify arguments here that your own job-execution script consumes, | ||
as well as arguments that Glue itself consumes. | ||
Job arguments may be logged. Do not pass plaintext secrets as arguments. | ||
Retrieve secrets from a Glue Connection, Secrets Manager or other secret | ||
management mechanism if you intend to keep them within the Job. | ||
[doc](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html) | ||
job_watch_poll_interval: The amount of time to wait between AWS API | ||
calls while monitoring the state of a Glue Job. | ||
default is 60s because of jobs that use AWS Glue versions 2.0 and later | ||
have a 1-minute minimum. | ||
[AWS Glue Pricing](https://aws.amazon.com/glue/pricing/?nc1=h_ls) | ||
|
||
Example: | ||
Start a job to AWS Glue Job. | ||
|
||
```python | ||
from prefect import flow | ||
from prefect_aws import AwsCredentials | ||
from prefect_aws.glue_job import GlueJob | ||
|
||
|
||
@flow | ||
def example_run_glue_job(): | ||
aws_credentials = AwsCredentials( | ||
aws_access_key_id="your_access_key_id", | ||
aws_secret_access_key="your_secret_access_key" | ||
) | ||
glue_job = GlueJob( | ||
glue_job_name="your_glue_job_name", | ||
arguments={"--YOUR_EXTRA_ARGUMENT": "YOUR_EXTRA_ARGUMENT_VALUE"}, | ||
) | ||
return glue_job.run() | ||
|
||
example_run_glue_job() | ||
```""" | ||
|
||
job_name: str = Field( | ||
..., | ||
title="AWS Glue Job Name", | ||
description="The name of the job definition to use.", | ||
) | ||
|
||
arguments: Optional[dict] = Field( | ||
default=None, | ||
title="AWS Glue Job Arguments", | ||
description="The job arguments associated with this run.", | ||
) | ||
job_watch_poll_interval: float = Field( | ||
default=60.0, | ||
description=( | ||
"The amount of time to wait between AWS API calls while monitoring the " | ||
"state of an Glue Job." | ||
), | ||
) | ||
|
||
_error_states = ["FAILED", "STOPPED", "ERROR", "TIMEOUT"] | ||
|
||
type: Literal["glue-job"] = Field( | ||
"glue-job", description="The slug for this task type." | ||
) | ||
|
||
aws_credentials: AwsCredentials = Field( | ||
title="AWS Credentials", | ||
default_factory=AwsCredentials, | ||
description="The AWS credentials to use to connect to Glue.", | ||
) | ||
|
||
@sync_compatible | ||
async def run(self) -> GlueJobResult: | ||
"""Run the Glue Job.""" | ||
glue_client = await run_sync_in_worker_thread(self._get_client) | ||
|
||
return await run_sync_in_worker_thread(self.run_with_client, glue_client) | ||
|
||
@sync_compatible | ||
async def run_with_client(self, glue_job_client: _GlueJobClient) -> GlueJobResult: | ||
"""Run the Glue Job with Glue Client.""" | ||
run_job_id = await run_sync_in_worker_thread(self._start_job, glue_job_client) | ||
exit_code = await run_sync_in_worker_thread( | ||
self._watch_job_and_get_exit_code, glue_job_client, run_job_id | ||
) | ||
|
||
return GlueJobResult(identifier=run_job_id, status_code=exit_code) | ||
|
||
def preview(self) -> str: | ||
""" | ||
Generate a preview of the job information that will be sent to AWS. | ||
""" | ||
preview = "---\n# Glue Job\n" | ||
|
||
preview += f"Target Glue Job Name: {self.job_name}\n" | ||
if self.arguments is not None: | ||
argument_text = "" | ||
for key, value in self.arguments.items(): | ||
argument_text += f" - {key}: {value}\n" | ||
preview += f"Job Arguments: \n{argument_text}" | ||
preview += f"Job Watch Interval: {self.job_watch_poll_interval}s\n" | ||
return preview | ||
|
||
def _start_job(self, glue_job_client: _GlueJobClient) -> str: | ||
""" | ||
Start the AWS Glue Job | ||
[doc](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue/client/start_job_run.html) | ||
""" | ||
self.logger.info( | ||
f"starting job {self.job_name} with arguments {self.arguments}" | ||
) | ||
try: | ||
response = self.__start_job_run(glue_job_client) | ||
job_run_id = str(response["JobRunId"]) | ||
self.logger.info(f"job started with job run id: {job_run_id}") | ||
return job_run_id | ||
except Exception as e: | ||
self.logger.error(f"failed to start job: {e}") | ||
raise RuntimeError | ||
|
||
def _watch_job_and_get_exit_code( | ||
self, glue_job_client: _GlueJobClient, job_run_id: str | ||
) -> Optional[int]: | ||
""" | ||
Wait for the job run to complete and get exit code | ||
""" | ||
self.logger.info(f"watching job {self.job_name} with run id {job_run_id}") | ||
exit_code = 0 | ||
while True: | ||
job = self.__get_job_run(glue_job_client, job_run_id) | ||
job_state = job["JobRun"]["JobRunState"] | ||
if job_state in self._error_states: | ||
# Generate a dynamic exception type from the AWS name | ||
self.logger.error(f"job failed: {job['JobRun']['ErrorMessage']}") | ||
raise RuntimeError(job["JobRun"]["ErrorMessage"]) | ||
elif job_state == "SUCCEEDED": | ||
self.logger.info(f"job succeeded: {job_run_id}") | ||
break | ||
|
||
time.sleep(self.job_watch_poll_interval) | ||
return exit_code | ||
|
||
def _get_client(self) -> _GlueJobClient: | ||
""" | ||
Retrieve a Glue Job Client | ||
""" | ||
boto_session = self.aws_credentials.get_boto3_session() | ||
return boto_session.client("glue") | ||
|
||
def __start_job_run(self, glue_job_client: _GlueJobClient): | ||
return glue_job_client.start_job_run( | ||
JobName=self.job_name, | ||
Arguments=self.arguments, | ||
) | ||
|
||
def __get_job_run(self, glue_job_client: _GlueJobClient, job_run_id: str): | ||
return glue_job_client.get_job_run(JobName=self.job_name, RunId=job_run_id) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
from unittest.mock import MagicMock | ||
|
||
import boto3 | ||
import pytest | ||
from moto import mock_glue | ||
from prefect import flow | ||
|
||
from prefect_aws.glue_job import GlueJob, GlueJobResult | ||
|
||
|
||
@pytest.fixture(scope="function") | ||
def glue_job_client(aws_credentials): | ||
with mock_glue(): | ||
yield boto3.client("glue", region_name="us-east-1") | ||
|
||
|
||
def test_get_client(aws_credentials): | ||
with mock_glue(): | ||
glue_client = GlueJob(job_name="test_job_name", arguments={})._get_client() | ||
assert hasattr(glue_client, "get_job_run") | ||
|
||
|
||
def test_start_job(aws_credentials, glue_job_client): | ||
with mock_glue(): | ||
glue_job_client.create_job( | ||
Name="test_job_name", Role="test-role", Command={}, DefaultArguments={} | ||
) | ||
glue_job = GlueJob(job_name="test_job_name", arguments={"arg1": "value1"}) | ||
print(glue_job.preview()) | ||
res_job_id = glue_job._start_job(glue_job_client) | ||
assert res_job_id == "01" | ||
|
||
|
||
def test_start_job_fail_because_not_exist_job(aws_credentials, glue_job_client): | ||
with mock_glue(): | ||
glue_job = GlueJob(job_name="test_job_name", arguments={}) | ||
with pytest.raises(RuntimeError): | ||
glue_job._start_job(glue_job_client) | ||
|
||
|
||
def test_watch_job_and_get_exit_code(aws_credentials, glue_job_client): | ||
with mock_glue(): | ||
glue_job_client.create_job( | ||
Name="test_job_name", Role="test-role", Command={}, DefaultArguments={} | ||
) | ||
job_run_id = glue_job_client.start_job_run( | ||
JobName="test_job_name", | ||
Arguments={}, | ||
)["JobRunId"] | ||
glue_job = GlueJob( | ||
job_name="test_job_name", arguments={}, job_watch_poll_interval=1.0 | ||
) | ||
glue_job_client.get_job_run = MagicMock( | ||
side_effect=[ | ||
{"JobRun": {"JobName": "test_job", "JobRunState": "RUNNING"}}, | ||
{"JobRun": {"JobName": "test_job", "JobRunState": "SUCCEEDED"}}, | ||
] | ||
) | ||
|
||
exist_code = glue_job._watch_job_and_get_exit_code(glue_job_client, job_run_id) | ||
assert exist_code == 0 | ||
|
||
|
||
def test_watch_job_and_get_exit_fail(aws_credentials, glue_job_client): | ||
with mock_glue(): | ||
glue_job_client.create_job( | ||
Name="test_job_name", Role="test-role", Command={}, DefaultArguments={} | ||
) | ||
job_run_id = glue_job_client.start_job_run( | ||
JobName="test_job_name", | ||
Arguments={}, | ||
)["JobRunId"] | ||
glue_job = GlueJob( | ||
job_name="test_job_name", arguments={}, job_watch_poll_interval=1.0 | ||
) | ||
glue_job_client.get_job_run = MagicMock( | ||
side_effect=[ | ||
{ | ||
"JobRun": { | ||
"JobName": "test_job_name", | ||
"JobRunState": "FAILED", | ||
"ErrorMessage": "err", | ||
} | ||
}, | ||
] | ||
) | ||
|
||
with pytest.raises(RuntimeError): | ||
glue_job._watch_job_and_get_exit_code(glue_job_client, job_run_id) | ||
|
||
|
||
def test_run_with_client(aws_credentials, glue_job_client): | ||
with mock_glue(): | ||
|
||
@flow | ||
def test_run_with_client_flow(): | ||
glue_job_client.create_job( | ||
Name="test_job_name", Role="test-role", Command={}, DefaultArguments={} | ||
) | ||
return GlueJob( | ||
job_name="test_job_name", arguments={}, job_watch_poll_interval=1.0 | ||
).run_with_client(glue_job_client) | ||
|
||
res = test_run_with_client_flow() | ||
assert res == GlueJobResult(identifier="01", status_code=0) |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here you are referencing
glue_job_name
but I believe you are actually usingjob_name
instead.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@feliperazeek
Thanks for your comment.
fixed it.
8f9e9af