Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Feature Implementation: AWS Glue Job Execution Support #308

Merged
merged 45 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
1f82b74
Add Glue Job
knakazawa99 Sep 4, 2023
4fb7a2c
Add Glue Job Doc
knakazawa99 Sep 4, 2023
8f9e9af
Fix Example of PyDoc
knakazawa99 Sep 18, 2023
9d6ac4f
Add pyparsing
knakazawa99 Oct 7, 2023
5af0057
Add Glue Job Worker
knakazawa99 Oct 7, 2023
5c46568
Change mkdocs
knakazawa99 Oct 7, 2023
7adaf5b
Add region to mock_aws_credentials
knakazawa99 Oct 11, 2023
d2183db
Add aws_default_region to mock_aws_credentials
knakazawa99 Oct 13, 2023
71552b9
Fix region name for glue job client test
knakazawa99 Oct 13, 2023
f44c66f
Merge branch 'main' into main
knakazawa99 Nov 6, 2023
956c7da
Merge branch 'main' into main
desertaxle Nov 13, 2023
c726304
Add Class Doc
knakazawa99 Nov 14, 2023
bae5e12
add pydantic v2 support
knakazawa99 Nov 14, 2023
71934c2
Remove optional type hint
knakazawa99 Nov 14, 2023
e71b947
Change dict type hint
knakazawa99 Nov 14, 2023
68b02b8
Add module comment
knakazawa99 Nov 14, 2023
3b501e2
Support pydantic v2
knakazawa99 Nov 14, 2023
1dceb73
Change region name
knakazawa99 Nov 14, 2023
8390a94
use aws_credentials in fixture
knakazawa99 Nov 14, 2023
6b3ff0b
use aws_credentials in test functions
knakazawa99 Nov 14, 2023
c0d611f
Merge branch 'main' into main
knakazawa99 Nov 16, 2023
3bd4a17
Remove Glue Job Worker
knakazawa99 Jan 23, 2024
7ea43d4
Remove Glue Job inherited from Infrastructure, Implement Glue Job Brock
knakazawa99 Jan 24, 2024
491191e
Merge remote-tracking branch 'origin/main'
knakazawa99 Jan 24, 2024
9e06097
Remove Glue Job Worker From Docs
knakazawa99 Jan 24, 2024
12299d4
Merge branch 'main' into main
knakazawa99 Jan 24, 2024
41367dd
Add aws_credentials to test_get_client
knakazawa99 Jan 24, 2024
0d04479
Merge remote-tracking branch 'origin/main'
knakazawa99 Jan 24, 2024
cb36769
Add aws_credentials to test_get_client
knakazawa99 Jan 24, 2024
7b9cbad
remove aws_credentials from test_get_client
knakazawa99 Jan 25, 2024
32a43c0
fix: ref https://github.com/PrefectHQ/prefect-aws/pull/308/commits/6b…
knakazawa99 Jan 25, 2024
18e71d7
Merge branch 'main' into main
desertaxle Feb 12, 2024
bd69c20
Merge branch 'main' into main
desertaxle Feb 27, 2024
3bf96cb
Updates BaseModel import
desertaxle Feb 27, 2024
f086f03
Fixes syntax
desertaxle Feb 27, 2024
7048c2a
Merge branch 'main' into main
desertaxle Feb 27, 2024
f3c7d2c
Refactor: Moved responsibilities of JobRun to JobBlock
knakazawa99 Mar 17, 2024
705ffe4
Merge remote-tracking branch 'origin/main'
knakazawa99 Mar 17, 2024
993076b
Merge branch 'main' into main
knakazawa99 Mar 17, 2024
d92238d
fix degrade
knakazawa99 Mar 17, 2024
5a4fd2d
fix for ut
knakazawa99 Mar 17, 2024
12781be
fix for ut
knakazawa99 Mar 17, 2024
d5c37e2
fix for ut
knakazawa99 Mar 17, 2024
9f71f03
Fix for generated doc
knakazawa99 Mar 17, 2024
474c045
Merge branch 'main' into main
desertaxle Apr 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/glue_job.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
description: Tasks for interacting with AWS Glue Job
notes: This documentation page is generated from source file docstrings.
---

::: prefect_aws.glue_job
6 changes: 6 additions & 0 deletions docs/glue_job_worker.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
description: Worker integration with the AWS Glue Job
notes: This documentation page is generated from source file docstrings.
---

::: prefect_aws.workers.glue_job_worker
2 changes: 2 additions & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ nav:
- ECS: ecs.md
- Deployments:
- Steps: deployments/steps.md
- Glue Job: glue_job.md
- Glue Job Worker: glue_job_worker.md
- S3: s3.md
- Secrets Manager: secrets_manager.md

Expand Down
183 changes: 183 additions & 0 deletions prefect_aws/glue_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
"""
Integrations with the AWS Glue Job.

"""
import time
from typing import Any, Optional

from prefect.infrastructure.base import Infrastructure, InfrastructureResult
from prefect.utilities.asyncutils import run_sync_in_worker_thread, sync_compatible
from pydantic import Field
from typing_extensions import Literal

from prefect_aws import AwsCredentials

_GlueJobClient = Any


class GlueJobResult(InfrastructureResult):
"""The result of a run of a Glue Job"""


class GlueJob(Infrastructure):
"""
Execute a job to the AWS Glue Job service.

Attributes:
job_name: The name of the job definition to use.
arguments: The job arguments associated with this run.
For this job run, they replace the default arguments set in the job
definition itself.
You can specify arguments here that your own job-execution script consumes,
as well as arguments that Glue itself consumes.
Job arguments may be logged. Do not pass plaintext secrets as arguments.
Retrieve secrets from a Glue Connection, Secrets Manager or other secret
management mechanism if you intend to keep them within the Job.
[doc](https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-glue-arguments.html)
job_watch_poll_interval: The amount of time to wait between AWS API
calls while monitoring the state of a Glue Job.
default is 60s because of jobs that use AWS Glue versions 2.0 and later
have a 1-minute minimum.
[AWS Glue Pricing](https://aws.amazon.com/glue/pricing/?nc1=h_ls)

Example:
Start a job to AWS Glue Job.

```python
from prefect import flow
from prefect_aws import AwsCredentials
from prefect_aws.glue_job import GlueJob


@flow
def example_run_glue_job():
aws_credentials = AwsCredentials(
aws_access_key_id="your_access_key_id",
aws_secret_access_key="your_secret_access_key"
)
glue_job = GlueJob(
job_name="your_glue_job_name",
arguments={"--YOUR_EXTRA_ARGUMENT": "YOUR_EXTRA_ARGUMENT_VALUE"},
)
return glue_job.run()

example_run_glue_job()
```"""

job_name: str = Field(
...,
title="AWS Glue Job Name",
description="The name of the job definition to use.",
)

arguments: Optional[dict] = Field(
default=None,
title="AWS Glue Job Arguments",
description="The job arguments associated with this run.",
)
job_watch_poll_interval: float = Field(
default=60.0,
description=(
"The amount of time to wait between AWS API calls while monitoring the "
"state of an Glue Job."
),
)

_error_states = ["FAILED", "STOPPED", "ERROR", "TIMEOUT"]

type: Literal["glue-job"] = Field(
"glue-job", description="The slug for this task type."
)

aws_credentials: AwsCredentials = Field(
title="AWS Credentials",
default_factory=AwsCredentials,
description="The AWS credentials to use to connect to Glue.",
)

@sync_compatible
async def run(self) -> GlueJobResult:
"""Run the Glue Job."""
glue_client = await run_sync_in_worker_thread(self._get_client)

return await run_sync_in_worker_thread(self.run_with_client, glue_client)

@sync_compatible
async def run_with_client(self, glue_job_client: _GlueJobClient) -> GlueJobResult:
"""Run the Glue Job with Glue Client."""
run_job_id = await run_sync_in_worker_thread(self._start_job, glue_job_client)
exit_code = await run_sync_in_worker_thread(
self._watch_job_and_get_exit_code, glue_job_client, run_job_id
)

return GlueJobResult(identifier=run_job_id, status_code=exit_code)

def preview(self) -> str:
"""
Generate a preview of the job information that will be sent to AWS.
"""
preview = "---\n# Glue Job\n"

preview += f"Target Glue Job Name: {self.job_name}\n"
if self.arguments is not None:
argument_text = ""
for key, value in self.arguments.items():
argument_text += f" - {key}: {value}\n"
preview += f"Job Arguments: \n{argument_text}"
preview += f"Job Watch Interval: {self.job_watch_poll_interval}s\n"
return preview

def _start_job(self, glue_job_client: _GlueJobClient) -> str:
"""
Start the AWS Glue Job
[doc](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue/client/start_job_run.html)
"""
self.logger.info(
f"starting job {self.job_name} with arguments {self.arguments}"
)
try:
response = self.__start_job_run(glue_job_client)
job_run_id = str(response["JobRunId"])
self.logger.info(f"job started with job run id: {job_run_id}")
return job_run_id
except Exception as e:
self.logger.error(f"failed to start job: {e}")
raise RuntimeError

def _watch_job_and_get_exit_code(
self, glue_job_client: _GlueJobClient, job_run_id: str
) -> Optional[int]:
"""
Wait for the job run to complete and get exit code
"""
self.logger.info(f"watching job {self.job_name} with run id {job_run_id}")
exit_code = 0
while True:
job = self.__get_job_run(glue_job_client, job_run_id)
job_state = job["JobRun"]["JobRunState"]
if job_state in self._error_states:
# Generate a dynamic exception type from the AWS name
self.logger.error(f"job failed: {job['JobRun']['ErrorMessage']}")
raise RuntimeError(job["JobRun"]["ErrorMessage"])
elif job_state == "SUCCEEDED":
self.logger.info(f"job succeeded: {job_run_id}")
break

time.sleep(self.job_watch_poll_interval)
return exit_code

def _get_client(self) -> _GlueJobClient:
"""
Retrieve a Glue Job Client
"""
boto_session = self.aws_credentials.get_boto3_session()
return boto_session.client("glue")

def __start_job_run(self, glue_job_client: _GlueJobClient):
return glue_job_client.start_job_run(
JobName=self.job_name,
Arguments=self.arguments,
)

def __get_job_run(self, glue_job_client: _GlueJobClient, job_run_id: str):
return glue_job_client.get_job_run(JobName=self.job_name, RunId=job_run_id)
156 changes: 156 additions & 0 deletions prefect_aws/workers/glue_job_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
import logging
desertaxle marked this conversation as resolved.
Show resolved Hide resolved
import time
from typing import Any, Optional

import anyio
from prefect.server.schemas.core import FlowRun
from prefect.utilities.asyncutils import run_sync_in_worker_thread
from prefect.workers.base import (
BaseJobConfiguration,
BaseVariables,
BaseWorker,
BaseWorkerResult,
)
from pydantic import Field
desertaxle marked this conversation as resolved.
Show resolved Hide resolved

from prefect_aws import AwsCredentials

_GlueJobClient = Any


class GlueJobWorkerConfiguration(BaseJobConfiguration):
"""
Job configuration for a Glue Job.
"""

job_name: str = Field(
...,
title="AWS Glue Job Name",
description="The name of the job definition to use.",
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BaseJobConfiguration class has a name attribute that automatically gets populated with the flow name. I think it'd be worthwhile to use that attribute rather than introducing another name attribute.

Suggested change
job_name: str = Field(
...,
title="AWS Glue Job Name",
description="The name of the job definition to use.",
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, does this require a Glue job to exist already? Usually, workers dynamically create jobs to execute flow runs.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am user very interested in using this feature, I actually I have forked this into my repo already. I think it's expected that the glue job exists already.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@feliperazeek are you using the worker to run Glue jobs or the block in this PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable name is good and will work, but I think the implementation of the worker needs to change to operate the same as our other workers. I'd expect a Prefect Glue worker to create a Glue job for each flow run that it picks up and then monitor that Glue job for completion.

@knakazawa99 I think we can take this PR in one of two directions:

  1. Update the worker and block to create new Glue jobs for executing flow runs
  2. Remove the worker and update the block to implement the JobBlock interface instead of the Infrastructure interface. This will make it more suited for executing existing Glue jobs (which I'm realizing might have been the original intent of the PR, which I misunderstood).

Let me know which direction you'd like to go with this PR, and I will do whatever I can to help. Thank you for your patience and sticking with this PR!

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@feliperazeek are you using the worker to run Glue jobs or the block in this PR?

Running glue jobs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@desertaxle
Is the following direction correct?

  1. delete Worker
  2. modify the part that inherits Infrastructure to match the Interface of JobBlock

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for the delay. Yes, that is the direction that I'm suggesting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@desertaxle
I have fixed it. However, the test is failing due to the boto3. I have not been able to reproduce this error in my local environment.

arguments: Optional[dict] = Field(
default=None,
title="AWS Glue Job Arguments",
description="The job arguments associated with this run.",
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like how flexible this field is, but it'd be nice to have some stronger typing while maintaining flexibility. Introducing a variables class that contains some values that will often vary between deployments and creating a default template can help with that. You can checkout the ECS worker's variables class as an example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fix it.
Does this match what you had in mind?
e71b947

job_watch_poll_interval: float = Field(
default=60.0,
description=(
"The amount of time to wait between AWS API calls while monitoring the "
"state of an Glue Job."
),
)
aws_credentials: Optional[AwsCredentials] = Field(default_factory=AwsCredentials)
desertaxle marked this conversation as resolved.
Show resolved Hide resolved
error_states = ["FAILED", "STOPPED", "ERROR", "TIMEOUT"]


class GlueJobWorkerResult(BaseWorkerResult):
"""
The result of Glue job.
"""


class GlueJobWorker(BaseWorker):
desertaxle marked this conversation as resolved.
Show resolved Hide resolved
type = "glue-job"
job_configuration = GlueJobWorkerConfiguration
job_configuration_variables = BaseVariables
_description = "Execute flow runs Glue Job."
_display_name = "AWS Glue Job"
_documentation_url = "https://prefecthq.github.io/prefect-aws/glue_job/"
_logo_url = "https://images.ctfassets.net/gm98wzqotmnx/1jbV4lceHOjGgunX15lUwT/db88e184d727f721575aeb054a37e277/aws.png?h=250" # noqa

async def run(
self,
flow_run: FlowRun,
configuration: GlueJobWorkerConfiguration,
task_status: Optional[anyio.abc.TaskStatus] = None,
) -> GlueJobWorkerResult:
"""Run the Glue Job."""
glue_job_client = await run_sync_in_worker_thread(
self._get_client, configuration
)
return await run_sync_in_worker_thread(
self.run_with_client, glue_job_client, configuration
)

async def run_with_client(
self,
flow_run: FlowRun,
glue_job_client: _GlueJobClient,
configuration: GlueJobWorkerConfiguration,
) -> GlueJobWorkerResult:
"""Run the Glue Job with Glue Client."""
logger = self.get_flow_run_logger(flow_run)
run_job_id = await run_sync_in_worker_thread(
self._start_job, logger, glue_job_client, configuration
)
exit_code = await run_sync_in_worker_thread(
self._watch_job_and_get_exit_code,
logger,
glue_job_client,
run_job_id,
configuration,
)
return GlueJobWorkerResult(identifier=run_job_id, status_code=exit_code)

@staticmethod
def _get_client(configuration: GlueJobWorkerConfiguration) -> _GlueJobClient:
"""
Retrieve a Glue Job Client
"""
boto_session = configuration.aws_credentials.get_boto3_session()
return boto_session.client("glue")

@staticmethod
def _start_job(
logger: logging.Logger,
glue_job_client: _GlueJobClient,
configuration: GlueJobWorkerConfiguration,
) -> str:
"""
Start the AWS Glue Job
[doc](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue/client/start_job_run.html)
"""
logger.info(
f"starting job {configuration.job_name} with arguments"
f" {configuration.arguments}"
)
try:
response = glue_job_client.start_job_run(
JobName=configuration.job_name,
Arguments=configuration.arguments,
)
job_run_id = str(response["JobRunId"])
logger.info(f"job started with job run id: {job_run_id}")
return job_run_id
except Exception as e:
logger.error(f"failed to start job: {e}")
raise RuntimeError

@staticmethod
def _watch_job_and_get_exit_code(
logger: logging.Logger,
glue_job_client: _GlueJobClient,
job_run_id: str,
configuration: GlueJobWorkerConfiguration,
) -> Optional[int]:
"""
Wait for the job run to complete and get exit code
"""
logger.info(f"watching job {configuration.job_name} with run id {job_run_id}")
exit_code = 0
while True:
job = glue_job_client.get_job_run(
JobName=configuration.job_name, RunId=job_run_id
)
job_state = job["JobRun"]["JobRunState"]
if job_state in configuration.error_states:
# Generate a dynamic exception type from the AWS name
logger.error(f"job failed: {job['JobRun']['ErrorMessage']}")
raise RuntimeError(job["JobRun"]["ErrorMessage"])
elif job_state == "SUCCEEDED":
logger.info(f"job succeeded: {job_run_id}")
break

time.sleep(configuration.job_watch_poll_interval)
return exit_code
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ moto >= 3.1.16, < 4.2.5
mypy
pillow
pre-commit
pyparsing>=3.1.1
pytest
pytest-asyncio
pytest-cov
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ botocore>=1.27.53
mypy_boto3_s3>=1.24.94
mypy_boto3_secretsmanager>=1.26.49
prefect>=2.13.5
tenacity>=8.0.0
pyparsing>=3.1.1
tenacity>=8.0.0
6 changes: 5 additions & 1 deletion tests/mock_aws_credentials
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
[TEST_PROFILE_1]
aws_access_key_id = mock
aws_secret_access_key = mock
aws_region = us-west-1
aws_default_region = us-west-1

[TEST_PROFILE_2]
aws_access_key_id = mock
aws_secret_access_key = mock
aws_secret_access_key = mock
aws_region = us-west-1
aws_default_region = us-west-1
Loading
Loading