Skip to content

Commit

Permalink
k8s: added executor config
Browse files Browse the repository at this point in the history
  • Loading branch information
ErnestaP committed Apr 16, 2024
1 parent a126e4f commit 412ba71
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 4 deletions.
14 changes: 14 additions & 0 deletions dags/executor_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from kubernetes import client as k8s

kubernetes_executor_config = {
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
image_pull_policy="Always",
)
],
)
),
}
3 changes: 2 additions & 1 deletion dags/open_access/gold_open_access_mechanisms.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pendulum
from airflow.decorators import dag, task
from airflow.providers.postgres.operators.postgres import PostgresOperator
from executor_config import kubernetes_executor_config


@dag(
Expand All @@ -12,7 +13,7 @@
params={"year": 2023},
)
def oa_gold_open_access_mechanisms():
@task()
@task(executor_config=kubernetes_executor_config)
def fetch_data_task(query, **kwargs):
year = kwargs["params"].get("year")
golden_access_base_query = (
Expand Down
5 changes: 3 additions & 2 deletions dags/open_access/open_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pendulum
from airflow.decorators import dag, task
from airflow.providers.postgres.operators.postgres import PostgresOperator
from executor_config import kubernetes_executor_config


@dag(
Expand All @@ -12,7 +13,7 @@
params={"year": 2023},
)
def oa_dag():
@task()
@task(executor_config=kubernetes_executor_config)
def fetch_data_task(query, **kwargs):
year = kwargs["params"].get("year")
base_query = (
Expand All @@ -26,7 +27,7 @@ def fetch_data_task(query, **kwargs):
total = utils.get_total_results_count(data.text)
return {type_of_query: total}

@task(multiple_outputs=True)
@task(multiple_outputs=True, executor_config=kubernetes_executor_config)
def join(values, **kwargs):
results = reduce(lambda a, b: {**a, **b}, values)
results["years"] = kwargs["params"].get("year")
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
-c https://raw.githubusercontent.com/apache/airflow/constraints-2.8.1/constraints-3.10.txt
apache-airflow[celery, postgres, redis]==2.8.1
apache-airflow[celery, postgres, redis, cncf.kubernetes]==2.8.1
alembic==1.13.1
airflow-provider-alembic==1.0.0

0 comments on commit 412ba71

Please sign in to comment.