forked from astronomer/2-9-example-dags
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtoy_xcom_big_v_small.py
47 lines (35 loc) · 1.45 KB
/
toy_xcom_big_v_small.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
"""
## Toy DAG to show size dependant custom XCom serialization
This DAG pushes two objects to XCom, one below, one above 1000 bytes.
It then pulls them and prints their sizes.
"""
from airflow.decorators import dag, task
from airflow.models.baseoperator import chain
@dag(
start_date=None,
schedule=None,
catchup=False,
doc_md=__doc__,
tags=["Object Store custom XCom backend", "toy"],
)
def toy_xcom_big_vs_small():
@task
def push_objects(**context) -> None:
"""Create a small and a big dictionary, print their sizes and push them to XCom."""
small_obj = {"a": 23}
big_obj = {f"key{i}": "x" * 100 for i in range(100)}
print(f"Size of small object: {small_obj.__sizeof__()}")
print(f"Size of big object: {big_obj.__sizeof__()}")
context["ti"].xcom_push(key="small_obj", value=small_obj)
context["ti"].xcom_push(key="big_obj", value=big_obj)
@task
def pull_objects(**context) -> None:
"""Pull the small and big dictionaries from XCom and print their sizes."""
small_obj = context["ti"].xcom_pull(task_ids="push_objects", key="small_obj")
big_obj = context["ti"].xcom_pull(task_ids="push_objects", key="big_obj")
print(small_obj)
print(f"Size of small object: {small_obj.__sizeof__()}")
print(big_obj)
print(f"Size of big object: {big_obj.__sizeof__()}")
chain(push_objects(), pull_objects())
toy_xcom_big_vs_small()