From 66d0758f1b31a88b8800ed38e3b613bd9d531e2a Mon Sep 17 00:00:00 2001 From: sagarggh Date: Fri, 17 Sep 2021 10:01:35 +0545 Subject: [PATCH 1/3] [datapusher Dag][s] Dag scheduler for stuck jobs resubmit on datapusher --- aircan/dags/datapusher_jobs.py | 81 ++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) create mode 100644 aircan/dags/datapusher_jobs.py diff --git a/aircan/dags/datapusher_jobs.py b/aircan/dags/datapusher_jobs.py new file mode 100644 index 0000000..da84af3 --- /dev/null +++ b/aircan/dags/datapusher_jobs.py @@ -0,0 +1,81 @@ +""" +This DAG is developed to check CKAN datapusher stuck jobs and resubmit them automatically at scheduled time. +You need to provide below environment variables. + DATAPUSHER_SITE_URL = '[URL of CKAN]' + DATAPUSHER_API_KEY = '[User API key from CKAN]' +NOTE: This DAG requires custom API '/api/3/action/datapusher_jobs_list' on CKAN which should return list of datapusher +jobs, otherwise it wouldn't work. +""" + +from urllib.parse import urljoin + +from datetime import datetime, timedelta +import logging +from airflow.utils.dates import days_ago +import requests + +from airflow import DAG +from airflow.operators.python_operator import PythonOperator +from airflow.models import Variable + +CKAN_SITE_URL = Variable.get("DATAPUSHER_SITE_URL") +CKAN_API_KEY = Variable.get("DATAPUSHER_API_KEY") + + +args = { + 'start_date': days_ago(0) +} + +dag = DAG( + dag_id='datapusher_jobs_checks', + default_args=args, + # Everyday at midnight + schedule_interval='0 0 * * *' +) + +def get_datapusher_jobs(ckan_site_url): + try: + response = requests.get( + urljoin(ckan_site_url, '/api/3/action/datapusher_jobs_list') + ) + response.raise_for_status() + if response.status_code == 200: + logging.info(response.json()['result']) + return response.json()['result'] + except requests.exceptions.HTTPError as e: + return e.response.text + + +def resubmit_datapusher(ckan_site_url, ckan_api_key, resource_id): + header = {'Authorization': ckan_api_key} + try: + response = requests.post( + urljoin(ckan_site_url, '/api/3/action/datapusher_submit'), + headers=header, + json={'resource_id': resource_id} + + ) + response.raise_for_status() + if response.status_code == 200: + logging.info('Datapusher successfully resubmitted for {0}'.format(resource_id)) + except requests.exceptions.HTTPError as e: + return e.response.text + + +def datapusher_jobs_checks(): + logging.info('Retrieving datapusher jobs lists') + jobs_dict = get_datapusher_jobs(CKAN_SITE_URL) + for job in jobs_dict: + if job['state'] == 'pending' and job['last_updated']: + time_since_last_updated = datetime.utcnow() - datetime.fromisoformat('2021-09-15 01:42:36.605660') + # Only submit if it has been pending for the last 5 hours. + if time_since_last_updated > timedelta(hours=5): + logging.info('Resubmitting {0}, was pending from {1} ago.'.format(job['entity_id'], time_since_last_updated)) + resubmit_datapusher(CKAN_SITE_URL, CKAN_API_KEY, job['entity_id']) + +datapusher_jobs_checks_task = PythonOperator( + task_id='datapusher_jobs_check', + provide_context=False, + python_callable=datapusher_jobs_checks, + dag=dag, +) From 40d5b23c122d2981996a46715226cb2cc2bb5811 Mon Sep 17 00:00:00 2001 From: sagarggh Date: Sun, 19 Sep 2021 11:59:30 +0545 Subject: [PATCH 2/3] [datapusher Dag][s] hardcoded date replace with job last updated date --- aircan/dags/datapusher_jobs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aircan/dags/datapusher_jobs.py b/aircan/dags/datapusher_jobs.py index da84af3..96e7ec3 100644 --- a/aircan/dags/datapusher_jobs.py +++ b/aircan/dags/datapusher_jobs.py @@ -67,7 +67,7 @@ def datapusher_jobs_checks(): jobs_dict = get_datapusher_jobs(CKAN_SITE_URL) for job in jobs_dict: if job['state'] == 'pending' and job['last_updated']: - time_since_last_updated = datetime.utcnow() - datetime.fromisoformat('2021-09-15 01:42:36.605660') + time_since_last_updated = datetime.utcnow() - datetime.fromisoformat(job['entity_id']) # Only submit if it has been pending for the last 5 hours. if time_since_last_updated > timedelta(hours=5): logging.info('Resubmitting {0}, was pending from {1} ago.'.format(job['entity_id'], time_since_last_updated)) From 64e5ffac368cb71319566d645d84b13258549109 Mon Sep 17 00:00:00 2001 From: sagarggh Date: Sun, 19 Sep 2021 12:02:25 +0545 Subject: [PATCH 3/3] [datapusher Dag][s] hardcoded date replace with job last updated date --- aircan/dags/datapusher_jobs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aircan/dags/datapusher_jobs.py b/aircan/dags/datapusher_jobs.py index 96e7ec3..fd0bb99 100644 --- a/aircan/dags/datapusher_jobs.py +++ b/aircan/dags/datapusher_jobs.py @@ -67,7 +67,7 @@ def datapusher_jobs_checks(): jobs_dict = get_datapusher_jobs(CKAN_SITE_URL) for job in jobs_dict: if job['state'] == 'pending' and job['last_updated']: - time_since_last_updated = datetime.utcnow() - datetime.fromisoformat(job['entity_id']) + time_since_last_updated = datetime.utcnow() - datetime.fromisoformat(job['last_updated']) # Only submit if it has been pending for the last 5 hours. if time_since_last_updated > timedelta(hours=5): logging.info('Resubmitting {0}, was pending from {1} ago.'.format(job['entity_id'], time_since_last_updated))