1. Introduce GCP Composer which is a data job scheduler
2. Demo GCP Composer
Cloud composer is workflow management system that you create, schedule, and monitor data pipelines that utilize the cloud and data centers. Composer is built on a service called Airflow which uses certain workflow configurations. For more descrition see the google documentation.
1. Creating a Directed Acyclic Graph (DAG) using Python
Python_bash.py
Import statements
import datetime
from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator
Setting a variable for yesterday so we can be sure our DAG runs when we upload the Python file
yesterday = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
default_dag_args = {'start_date': yesterday}
Setting the interval at 1 day to run the DAG
with models.DAG(
'running_python_and_bash_operator',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
Any operators like the following will be added to the DAG object
# Python function
hello_world_greeting = python_operator.PythonOperator(
task_id='python_1',
python_callable=hello_world)
# Python function
sales_greeting = python_operator.PythonOperator(
task_id='python_2',
python_callable=greeting)
# Initiation of the bash operator
bash_greeting = bash_operator.BashOperator(
task_id='bye_bash',
bash_command='echo Goodbye! Hope to see you soon.')
Task order
hello_world_greeting >> sales_greeting >> bash_greeting
4. Airflow overview and detail screens
Note : Airflow has a slider button on the home page that indicates whether the DAG is active or non-active. When using a test DAG to avoid GCP charges, it is advisable to have the trigger off by turning the button to off status.
The following code sets the retry statement at 1 and the retry delay at 2min
default_dag_args = {
'start_date': yesterday,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=2)}
Also when triggered, we forced a ValueError to see if we get the proper response from running the DAG.
def hello_world():
raise ValueError('Oops! something went wrong.')
print('Hello World!')
return 1
The DAG ran as expected and gave us an error.
A triggering rule can also be set with the bash operator
bash_greeting = bash_operator.BashOperator(
task_id='bye_bash',
bash_command='echo Goodbye! Hope to see you soon.',
trigger_rule=trigger_rule.TriggerRule.ONE_FAILED)
In python_dash_dummy.py, an additional dummy statement is added for branching. Because most branching has a join operation, the dummy statement fills one of legs when branching.
# Addiotions to the python code
def makeBranchChoice():
x = random.randint(1, 5)
if(x <= 2):
return 'hello'
else:
return 'dummy'
run_this_first = dummy_operator.DummyOperator(
task_id='run_this_first')
branching = python_operator.BranchPythonOperator(
task_id='branching',
python_callable=makeBranchChoice)
run_this_first >> branching
sales_greeting = python_operator.PythonOperator(
task_id='hello',
python_callable=greeting)
dummy_followed_python = dummy_operator.DummyOperator(
task_id='follow_python')
dummy = dummy_operator.DummyOperator(
task_id='dummy')
bash_greeting = bash_operator.BashOperator(
task_id='bye_bash',
bash_command='echo Goodbye! Hope to see you soon.',
trigger_rule='one_success'
)
The above code generated a path with a dummy operator. In the if-else condition, the x > 2 so it followed the dummy path. If a dummy path was not programmed the DAG would have failed.