Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-82 Save references between assets and triggers #43826

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

vincbeck
Copy link
Contributor

@vincbeck vincbeck commented Nov 8, 2024

Resolves #42510.

This PR adds a new attributes watchers to the Asset class and saves references between assets and triggers in the DB. For example:

trigger = SqsSensorTrigger(sqs_queue="my_queue")
asset = Asset("example_asset_watchers", watchers=[trigger])

with DAG(
    dag_id="example_dataset_watcher",
    schedule=[asset],
    catchup=False,
):
    task = EmptyOperator(task_id="task",)

    chain(task)

This PR creates the trigger in the DB if it does not exist and save the reference between asset and trigger.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@boring-cyborg boring-cyborg bot added area:Scheduler including HA (high availability) scheduler area:task-sdk labels Nov 8, 2024
# task_ids returns a list and lists can't be hashed
if c == "task_ids":
val = tuple(self.task_dict)
# If it is a list, convert to tuple because lists can't be hashed
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ashb I made this more generic since watchers is also a list and I thought we could just say "if it is a list, then let's convert it to a tuple". Let me know if you have any concerns

@vincbeck vincbeck force-pushed the vincbeck/aip-82-save-references branch from 947e028 to 598bc69 Compare November 8, 2024 16:01
@vincbeck
Copy link
Contributor Author

vincbeck commented Nov 8, 2024

@Lee-W @uranusjr When working on it I realized that assets are added in the DB from DAG definition but never removed (or at least I did not see the code). Meaning, as a DAG author if I define an asset in my DAG and then later on remove it, the asset is never removed from the DB. Am I wrong? If not, is it intended?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler including HA (high availability) scheduler area:task-sdk
Projects
None yet
Development

Successfully merging this pull request may close these issues.

AIP-82. Save references asset <-> triggers when parsing DAGs
1 participant