forked from astronomer/2-9-example-dags
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtoy_custom_names_dynamic_tasks_taskflow.py
65 lines (49 loc) · 2.06 KB
/
toy_custom_names_dynamic_tasks_taskflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
"""
### Toy DAG to show how to use custom names for dynamic tasks - TaskFlowAPI
This DAG queries the fruityvice API for information about all fruits.
It then creates a dynamically mapped task printing the sugar content of each fruit,
with the dynamically mapped task instances being named after the fruit.
"""
from airflow.decorators import dag, task
import requests
from include.helpers import get_display_fruit
@dag(
start_date=None,
schedule=None,
catchup=False,
doc_md=__doc__,
tags=["Dynamic Task Mapping", "toy"],
)
def toy_custom_names_dynamic_tasks_taskflow():
@task
def get_fruits() -> list[dict]:
import random
rand_int = random.randint(10, 49)
r = requests.get(f"https://www.fruityvice.com/api/fruit/all").json()
r = random.sample(r, rand_int)
return r
# NEW in Airflow 2.9: Define custom names for the map index
@task(map_index_template="{{ my_mapping_variable }}")
def map_fruits(fruit_info: dict):
fruit_name = fruit_info["name"]
sugar_content = fruit_info["nutritions"]["sugar"]
calories = fruit_info["nutritions"]["calories"]
carbs = fruit_info["nutritions"]["carbohydrates"]
protein = fruit_info["nutritions"]["protein"]
fat = fruit_info["nutritions"]["fat"]
print(f"{fruit_name} sugar content: {sugar_content}")
print(f"{fruit_name} calories: {calories}")
print(f"{fruit_name} carbs: {carbs}")
print(f"{fruit_name} protein: {protein}")
print(f"{fruit_name} fat: {fat}")
display_fruit = get_display_fruit(fruit_name)
# create custom map index
from airflow.operators.python import get_current_context
context = get_current_context()
# The map index is added after the task has run, so it can include any computed values
# from within the task
context["my_mapping_variable"] = (
f"{display_fruit} {fruit_name} - {sugar_content}g sugar."
)
map_fruits.expand(fruit_info=get_fruits())
toy_custom_names_dynamic_tasks_taskflow()