-
Notifications
You must be signed in to change notification settings - Fork 40
Feature Implementation: AWS Glue Job Execution Support #308
Changes from 36 commits
1f82b74
4fb7a2c
8f9e9af
9d6ac4f
5af0057
5c46568
7adaf5b
d2183db
71552b9
f44c66f
956c7da
c726304
bae5e12
71934c2
e71b947
68b02b8
3b501e2
1dceb73
8390a94
6b3ff0b
c0d611f
3bd4a17
7ea43d4
491191e
9e06097
12299d4
41367dd
0d04479
cb36769
7b9cbad
32a43c0
18e71d7
bd69c20
3bf96cb
f086f03
7048c2a
f3c7d2c
705ffe4
993076b
d92238d
5a4fd2d
12781be
d5c37e2
9f71f03
474c045
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
--- | ||
description: Tasks for interacting with AWS Glue Job | ||
notes: This documentation page is generated from source file docstrings. | ||
--- | ||
|
||
::: prefect_aws.glue_job |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,192 @@ | ||
""" | ||
Integrations with the AWS Glue Job. | ||
|
||
""" | ||
import time | ||
from typing import Any, Optional | ||
|
||
from prefect.blocks.abstract import JobBlock, JobRun | ||
from prefect.utilities.asyncutils import run_sync_in_worker_thread | ||
from pydantic import VERSION as PYDANTIC_VERSION | ||
|
||
if PYDANTIC_VERSION.startswith("2."): | ||
from pydantic.v1 import BaseModel, Field | ||
else: | ||
from pydantic import BaseModel, Field | ||
|
||
from prefect_aws import AwsCredentials | ||
|
||
|
||
class GlueJobRun(JobRun, BaseModel): | ||
"""Execute a Glue Job""" | ||
|
||
job_name: str = Field( | ||
..., | ||
title="AWS Glue Job Name", | ||
description="The name of the job definition to use.", | ||
) | ||
|
||
arguments: Optional[dict] = Field( | ||
default=None, | ||
title="AWS Glue Job Arguments", | ||
description="The job arguments associated with this run.", | ||
) | ||
job_watch_poll_interval: float = Field( | ||
default=60.0, | ||
description=( | ||
"The amount of time to wait between AWS API calls while monitoring the " | ||
"state of an Glue Job." | ||
), | ||
) | ||
|
||
_error_states = ["FAILED", "STOPPED", "ERROR", "TIMEOUT"] | ||
|
||
aws_credentials: AwsCredentials = Field( | ||
title="AWS Credentials", | ||
default_factory=AwsCredentials, | ||
description="The AWS credentials to use to connect to Glue.", | ||
) | ||
|
||
client: Any = Field(default=None, description="") | ||
job_id: str = Field( | ||
default="", | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These look like they could be private fields. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The Glue Job Run responsibilities have changed and require public access. |
||
|
||
async def wait_for_completion(self) -> None: | ||
"""run and wait for completion""" | ||
await run_sync_in_worker_thread(self._get_client) | ||
await run_sync_in_worker_thread(self._start_job) | ||
await run_sync_in_worker_thread(self._watch_job) | ||
|
||
async def fetch_result(self) -> str: | ||
"""fetch glue job result""" | ||
job = self._get_job_run() | ||
return job["JobRun"]["JobRunState"] | ||
|
||
def _start_job(self) -> str: | ||
""" | ||
Start the AWS Glue Job | ||
[doc](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue/client/start_job_run.html) | ||
""" | ||
self.logger.info( | ||
f"starting job {self.job_name} with arguments {self.arguments}" | ||
) | ||
try: | ||
response = self.client.start_job_run( | ||
JobName=self.job_name, | ||
Arguments=self.arguments, | ||
) | ||
job_run_id = str(response["JobRunId"]) | ||
self.logger.info(f"job started with job run id: {job_run_id}") | ||
self.job_id = job_run_id | ||
return job_run_id | ||
except Exception as e: | ||
self.logger.error(f"failed to start job: {e}") | ||
raise RuntimeError | ||
|
||
def _watch_job(self) -> None: | ||
""" | ||
Wait for the job run to complete and get exit code | ||
""" | ||
self.logger.info(f"watching job {self.job_name} with run id {self.job_id}") | ||
while True: | ||
job = self._get_job_run() | ||
job_state = job["JobRun"]["JobRunState"] | ||
if job_state in self._error_states: | ||
# Generate a dynamic exception type from the AWS name | ||
self.logger.error(f"job failed: {job['JobRun']['ErrorMessage']}") | ||
raise RuntimeError(job["JobRun"]["ErrorMessage"]) | ||
elif job_state == "SUCCEEDED": | ||
self.logger.info(f"job succeeded: {self.job_id}") | ||
break | ||
|
||
time.sleep(self.job_watch_poll_interval) | ||
|
||
def _get_client(self) -> None: | ||
""" | ||
Retrieve a Glue Job Client | ||
""" | ||
boto_session = self.aws_credentials.get_boto3_session() | ||
self.client = boto_session.client("glue") | ||
|
||
def _get_job_run(self): | ||
"""get glue job""" | ||
return self.client.get_job_run(JobName=self.job_name, RunId=self.job_id) | ||
|
||
|
||
class GlueJobBlock(JobBlock): | ||
""" | ||
Execute a job to the AWS Glue Job service. | ||
Attributes: | ||
job_name: The name of the job definition to use. | ||
arguments: The job arguments associated with this run. | ||
For this job run, they replace the default arguments set in the job | ||
definition itself. | ||
You can specify arguments here that your own job-execution script consumes, | ||
as well as arguments that Glue itself consumes. | ||
Job arguments may be logged. Do not pass plaintext secrets as arguments. | ||
Retrieve secrets from a Glue Connection, Secrets Manager or other secret | ||
management mechanism if you intend to keep them within the Job. | ||
[doc](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html) | ||
job_watch_poll_interval: The amount of time to wait between AWS API | ||
calls while monitoring the state of a Glue Job. | ||
default is 60s because of jobs that use AWS Glue versions 2.0 and later | ||
have a 1-minute minimum. | ||
[AWS Glue Pricing](https://aws.amazon.com/glue/pricing/?nc1=h_ls) | ||
Example: | ||
Start a job to AWS Glue Job. | ||
```python | ||
from prefect import flow | ||
from prefect_aws import AwsCredentials | ||
from prefect_aws.glue_job import GlueJobBlock | ||
|
||
|
||
@flow | ||
def example_run_glue_job(): | ||
aws_credentials = AwsCredentials( | ||
aws_access_key_id="your_access_key_id", | ||
aws_secret_access_key="your_secret_access_key" | ||
) | ||
glue_job_run = GlueJobBlock( | ||
job_name="your_glue_job_name", | ||
arguments={"--YOUR_EXTRA_ARGUMENT": "YOUR_EXTRA_ARGUMENT_VALUE"}, | ||
).trigger() | ||
|
||
return glue_job_run.wait_for_completion() | ||
|
||
|
||
example_run_glue_job() | ||
```""" | ||
|
||
job_name: str = Field( | ||
..., | ||
title="AWS Glue Job Name", | ||
description="The name of the job definition to use.", | ||
) | ||
|
||
arguments: Optional[dict] = Field( | ||
default=None, | ||
title="AWS Glue Job Arguments", | ||
description="The job arguments associated with this run.", | ||
) | ||
job_watch_poll_interval: float = Field( | ||
default=60.0, | ||
description=( | ||
"The amount of time to wait between AWS API calls while monitoring the " | ||
"state of an Glue Job." | ||
), | ||
) | ||
|
||
aws_credentials: AwsCredentials = Field( | ||
title="AWS Credentials", | ||
default_factory=AwsCredentials, | ||
description="The AWS credentials to use to connect to Glue.", | ||
) | ||
|
||
async def trigger(self) -> GlueJobRun: | ||
"""trigger for GlueJobRun""" | ||
return GlueJobRun( | ||
job_name=self.job_name, | ||
arguments=self.arguments, | ||
job_watch_poll_interval=self.job_watch_poll_interval, | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should start the job run here and then return the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you for your comment. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,11 @@ | ||
[TEST_PROFILE_1] | ||
aws_access_key_id = mock | ||
aws_secret_access_key = mock | ||
aws_region = us-east-1 | ||
aws_default_region = us-east-1 | ||
|
||
[TEST_PROFILE_2] | ||
aws_access_key_id = mock | ||
aws_secret_access_key = mock | ||
aws_secret_access_key = mock | ||
aws_region = us-east-1 | ||
aws_default_region = us-east-1 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
from unittest.mock import MagicMock | ||
|
||
import pytest | ||
from moto import mock_glue | ||
|
||
from prefect_aws.glue_job import GlueJobBlock, GlueJobRun | ||
|
||
|
||
@pytest.fixture(scope="function") | ||
def glue_job_client(aws_credentials): | ||
with mock_glue(): | ||
boto_session = aws_credentials.get_boto3_session() | ||
yield boto_session.client("glue") | ||
|
||
|
||
async def test_fetch_result(aws_credentials, glue_job_client): | ||
glue_job_client.create_job( | ||
Name="test_job_name", Role="test-role", Command={}, DefaultArguments={} | ||
) | ||
job_run_id = glue_job_client.start_job_run( | ||
JobName="test_job_name", | ||
Arguments={}, | ||
)["JobRunId"] | ||
glue_job_run = GlueJobRun(job_name="test_job_name", arguments={"arg1": "value1"}) | ||
glue_job_run.job_id = job_run_id | ||
glue_job_run.client = glue_job_client | ||
result = await glue_job_run.fetch_result() | ||
assert result == "SUCCEEDED" | ||
|
||
|
||
def test_start_job(aws_credentials, glue_job_client): | ||
with mock_glue(): | ||
glue_job_client.create_job( | ||
Name="test_job_name", Role="test-role", Command={}, DefaultArguments={} | ||
) | ||
glue_job_run = GlueJobRun( | ||
job_name="test_job_name", arguments={"arg1": "value1"} | ||
) | ||
glue_job_run.client = glue_job_client | ||
glue_job_run._start_job() | ||
assert glue_job_run.job_id != "" | ||
|
||
|
||
def test_start_job_fail_because_not_exist_job(aws_credentials, glue_job_client): | ||
with mock_glue(): | ||
glue_job_run = GlueJobRun( | ||
job_name="test_job_name", arguments={"arg1": "value1"} | ||
) | ||
glue_job_run.client = glue_job_client | ||
with pytest.raises(RuntimeError): | ||
glue_job_run._start_job() | ||
|
||
|
||
def test_watch_job(aws_credentials, glue_job_client): | ||
with mock_glue(): | ||
glue_job_client.create_job( | ||
Name="test_job_name", Role="test-role", Command={}, DefaultArguments={} | ||
) | ||
glue_job_run = GlueJobRun( | ||
job_name="test_job_name", | ||
arguments={"arg1": "value1"}, | ||
job_watch_poll_interval=0.1, | ||
) | ||
job_run_id = glue_job_client.start_job_run( | ||
JobName="test_job_name", | ||
Arguments={}, | ||
)["JobRunId"] | ||
|
||
glue_job_client.get_job_run = MagicMock( | ||
side_effect=[ | ||
{ | ||
"JobRun": { | ||
"JobName": "test_job_name", | ||
"JobRunState": "RUNNING", | ||
} | ||
}, | ||
{ | ||
"JobRun": { | ||
"JobName": "test_job_name", | ||
"JobRunState": "SUCCEEDED", | ||
} | ||
}, | ||
] | ||
) | ||
glue_job_run.client = glue_job_client | ||
glue_job_run.job_id = job_run_id | ||
glue_job_run._watch_job() | ||
|
||
|
||
def test_watch_job_fail(aws_credentials, glue_job_client): | ||
with mock_glue(): | ||
glue_job_client.create_job( | ||
Name="test_job_name", Role="test-role", Command={}, DefaultArguments={} | ||
) | ||
glue_job_run = GlueJobRun( | ||
job_name="test_job_name", arguments={"arg1": "value1"} | ||
) | ||
job_run_id = glue_job_client.start_job_run( | ||
JobName="test_job_name", | ||
Arguments={}, | ||
)["JobRunId"] | ||
glue_job_run.client = glue_job_client | ||
glue_job_run.job_id = job_run_id | ||
glue_job_client.get_job_run = MagicMock( | ||
side_effect=[ | ||
{ | ||
"JobRun": { | ||
"JobName": "test_job_name", | ||
"JobRunState": "FAILED", | ||
"ErrorMessage": "err", | ||
} | ||
}, | ||
] | ||
) | ||
with pytest.raises(RuntimeError): | ||
glue_job_run._watch_job() | ||
|
||
|
||
def test_get_client(aws_credentials): | ||
with mock_glue(): | ||
glue_job_run = GlueJobRun( | ||
job_name="test_job_name", aws_credentials=aws_credentials | ||
) | ||
glue_job_run._get_client() | ||
assert hasattr(glue_job_run.client, "get_job_run") | ||
|
||
|
||
def test__get_job_run(aws_credentials, glue_job_client): | ||
with mock_glue(): | ||
glue_job_client.create_job( | ||
Name="test_job_name", Role="test-role", Command={}, DefaultArguments={} | ||
) | ||
job_run_id = glue_job_client.start_job_run( | ||
JobName="test_job_name", | ||
Arguments={}, | ||
)["JobRunId"] | ||
|
||
glue_job_run = GlueJobRun( | ||
job_name="test_job_name", arguments={"arg1": "value1"} | ||
) | ||
glue_job_run.job_id = job_run_id | ||
glue_job_run.client = glue_job_client | ||
response = glue_job_run._get_job_run() | ||
assert response["JobRun"]["JobRunState"] == "SUCCEEDED" | ||
|
||
|
||
async def test_trigger(): | ||
glue_job = GlueJobBlock(job_name="test_job_name", arguments={"arg1": "value1"}) | ||
glue_job_run = await glue_job.trigger() | ||
assert isinstance(glue_job_run, GlueJobRun) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class doesn't need to be a Pydantic BaseModel since we don't need validation. I think it might be simpler to make it a plain Python class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find it useful to depend on pydantic for uniformity with other object definitions, validation, and attribute descriptions.