From 9199785d7243ee371f5cd77b25fe0fed081fa981 Mon Sep 17 00:00:00 2001 From: sagarggh Date: Tue, 25 Jan 2022 21:02:26 +0545 Subject: [PATCH 1/4] [report][dag] Dag added to generate empty data dictionary report and sent via email --- aircan/dags/ng_data_dictionary_reporting.py | 149 ++++++++++++++++++++ 1 file changed, 149 insertions(+) create mode 100644 aircan/dags/ng_data_dictionary_reporting.py diff --git a/aircan/dags/ng_data_dictionary_reporting.py b/aircan/dags/ng_data_dictionary_reporting.py new file mode 100644 index 0000000..5ea2b31 --- /dev/null +++ b/aircan/dags/ng_data_dictionary_reporting.py @@ -0,0 +1,149 @@ +import logging +import datetime +import time +from pprint import pprint + +from airflow import DAG +from airflow.exceptions import AirflowException +from airflow.models import Variable +import requests +from urllib.parse import urljoin +from airflow.operators.python_operator import PythonOperator +from airflow.utils.dates import days_ago +from airflow.operators.email_operator import EmailOperator +from sendgrid import SendGridAPIClient +from sendgrid.helpers.mail import Mail +from string import Template +from time import sleep + +CKAN_URL = Variable.get("NG_CKAN_URL") +SENDGRID_KEY = Variable.get("NG_SENDGRID_API_KEY") +MAIL_FROM = Variable.get("NG_MAIL_FROM") +MAIL_TO = Variable.get("NG_MAIL_TO") + +args = { + 'start_date': days_ago(0) +} + +dag = DAG( + dag_id='ng_data_example_logs', + default_args=args, + schedule_interval='0 0 * * *' +) + +def get_all_datastore_tables(): + try: + logging.info("Retriving all datastore tables.") + response = requests.get( + urljoin(CKAN_URL, '/api/3/action/datastore_search?resource_id=_table_metadata') + ) + response.raise_for_status() + if response.status_code == 200: + return [resource['name'] for resource in response.json()['result']['records']] + + except requests.exceptions.HTTPError as e: + return e.response.text + + +def get_data_dictionary(res_id): + try: + logging.info("Retriving fields infomation for {res_id}".format(res_id=res_id)) + response = requests.get( + urljoin(CKAN_URL, + '/api/3/action/datastore_search?resource_id={0}&limit=0'.format(res_id)) + ) + response.raise_for_status() + if response.status_code == 200: + return response.json()['result']['fields'] + + except requests.exceptions.HTTPError as e: + logging.error("Failed to get fields infomation for {res_id}".format(res_id=res_id)) + return False + + +def check_empty_data_dictionary(ds, **kwargs): + all_datastore_tables = get_all_datastore_tables() + all_datastore_tables = all_datastore_tables[:30] + logging.info("Retriving all datastore tables.") + report = [] + for res_id in all_datastore_tables: + # Delay 500ms + sleep(0.5) + fields = get_data_dictionary(res_id) + if(fields): + data_dictionary = ['info' in f for f in fields] + print(data_dictionary) + if True not in data_dictionary: + report.append(res_id) + return report + + +def HTML_report_generate(resource_list): + table_row = "" + for resource in resource_list: + table_row += u"{0}{1}".format(resource, "Empty") + + html = """\ + + + + + + +

List of the resource with empty data dictionaries.

+ + + + + + $table_row +
Resource IDData Dictionary
+ + + """ + return Template(html).safe_substitute(table_row=table_row) + +def dispatch_email(**context): + resource_list = context['task_instance'].xcom_pull(task_ids='check_empty_data_dictionary') + message = Mail( + from_email=MAIL_FROM, + to_emails=('sagar.ghimire@datopian.com', 'info@sagarg.com.np'), + subject='Empty data dictionary report.', + html_content= HTML_report_generate(resource_list)) + try: + sg = SendGridAPIClient(SENDGRID_KEY) + response = sg.send(message) + logging.info("{0} - Report successfully sent via email.".format(response.status_code)) + except Exception as e: + logging.info(e.message) + + +check_empty_data_dictionary_task = PythonOperator( + task_id='check_empty_data_dictionary', + provide_context=True, + python_callable=check_empty_data_dictionary, + dag=dag, +) + +dispatch_email_task = PythonOperator( + task_id='dispatch_email', + provide_context=True, + python_callable=dispatch_email, + dag=dag, +) + +check_empty_data_dictionary_task >> dispatch_email_task \ No newline at end of file From 6908a5d5ba23a2c6c9389c2ad812c7e577c241f0 Mon Sep 17 00:00:00 2001 From: sagarggh Date: Wed, 26 Jan 2022 11:48:08 +0545 Subject: [PATCH 2/4] [daga][reporting] remove hardcode email and replaced with env variable --- aircan/dags/ng_data_dictionary_reporting.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aircan/dags/ng_data_dictionary_reporting.py b/aircan/dags/ng_data_dictionary_reporting.py index 5ea2b31..ea17188 100644 --- a/aircan/dags/ng_data_dictionary_reporting.py +++ b/aircan/dags/ng_data_dictionary_reporting.py @@ -121,7 +121,7 @@ def dispatch_email(**context): resource_list = context['task_instance'].xcom_pull(task_ids='check_empty_data_dictionary') message = Mail( from_email=MAIL_FROM, - to_emails=('sagar.ghimire@datopian.com', 'info@sagarg.com.np'), + to_emails=tuple(MAIL_TO.split(',')), subject='Empty data dictionary report.', html_content= HTML_report_generate(resource_list)) try: From 44134d04c7986b0bf189ffe5d0c2fa864cbe42ad Mon Sep 17 00:00:00 2001 From: sagarggh Date: Wed, 26 Jan 2022 11:55:39 +0545 Subject: [PATCH 3/4] [Dag][cleanup] removed extra print --- aircan/dags/ng_data_dictionary_reporting.py | 1 - 1 file changed, 1 deletion(-) diff --git a/aircan/dags/ng_data_dictionary_reporting.py b/aircan/dags/ng_data_dictionary_reporting.py index ea17188..ce2a4ea 100644 --- a/aircan/dags/ng_data_dictionary_reporting.py +++ b/aircan/dags/ng_data_dictionary_reporting.py @@ -72,7 +72,6 @@ def check_empty_data_dictionary(ds, **kwargs): fields = get_data_dictionary(res_id) if(fields): data_dictionary = ['info' in f for f in fields] - print(data_dictionary) if True not in data_dictionary: report.append(res_id) return report From 66bccff8dd1b59cddb4c5a03aac6a257d59ceba0 Mon Sep 17 00:00:00 2001 From: sagarggh Date: Thu, 27 Jan 2022 14:29:02 +0545 Subject: [PATCH 4/4] [dag][reporting] cleanup and improvement --- aircan/dags/ng_data_dictionary_reporting.py | 51 ++++++++++----------- requirements.txt | 2 +- 2 files changed, 26 insertions(+), 27 deletions(-) diff --git a/aircan/dags/ng_data_dictionary_reporting.py b/aircan/dags/ng_data_dictionary_reporting.py index ce2a4ea..93fdd13 100644 --- a/aircan/dags/ng_data_dictionary_reporting.py +++ b/aircan/dags/ng_data_dictionary_reporting.py @@ -12,7 +12,7 @@ from airflow.utils.dates import days_ago from airflow.operators.email_operator import EmailOperator from sendgrid import SendGridAPIClient -from sendgrid.helpers.mail import Mail +from sendgrid.helpers.mail import Mail, To from string import Template from time import sleep @@ -22,24 +22,23 @@ MAIL_TO = Variable.get("NG_MAIL_TO") args = { - 'start_date': days_ago(0) + "start_date": days_ago(0) } dag = DAG( - dag_id='ng_data_example_logs', + dag_id="ng_data_example_logs", default_args=args, - schedule_interval='0 0 * * *' + schedule_interval="0 0 * * 0" ) def get_all_datastore_tables(): try: logging.info("Retriving all datastore tables.") - response = requests.get( - urljoin(CKAN_URL, '/api/3/action/datastore_search?resource_id=_table_metadata') - ) + response = requests.get(urljoin(CKAN_URL, "/api/3/action/datastore_search?resource_id=_table_metadata"), + headers = {"User-Agent": "ckan-others/latest (internal API call from airflow dag)"}) response.raise_for_status() if response.status_code == 200: - return [resource['name'] for resource in response.json()['result']['records']] + return [resource["name"] for resource in response.json()["result"]["records"]] except requests.exceptions.HTTPError as e: return e.response.text @@ -49,12 +48,12 @@ def get_data_dictionary(res_id): try: logging.info("Retriving fields infomation for {res_id}".format(res_id=res_id)) response = requests.get( - urljoin(CKAN_URL, - '/api/3/action/datastore_search?resource_id={0}&limit=0'.format(res_id)) + urljoin(CKAN_URL, "/api/3/action/datastore_search?resource_id={0}&limit=0".format(res_id)), + headers={"User-Agent": "ckan-others/latest (internal API call from airflow dag)"} ) response.raise_for_status() if response.status_code == 200: - return response.json()['result']['fields'] + return response.json()["result"]["fields"] except requests.exceptions.HTTPError as e: logging.error("Failed to get fields infomation for {res_id}".format(res_id=res_id)) @@ -63,7 +62,6 @@ def get_data_dictionary(res_id): def check_empty_data_dictionary(ds, **kwargs): all_datastore_tables = get_all_datastore_tables() - all_datastore_tables = all_datastore_tables[:30] logging.info("Retriving all datastore tables.") report = [] for res_id in all_datastore_tables: @@ -117,29 +115,30 @@ def HTML_report_generate(resource_list): return Template(html).safe_substitute(table_row=table_row) def dispatch_email(**context): - resource_list = context['task_instance'].xcom_pull(task_ids='check_empty_data_dictionary') - message = Mail( - from_email=MAIL_FROM, - to_emails=tuple(MAIL_TO.split(',')), - subject='Empty data dictionary report.', - html_content= HTML_report_generate(resource_list)) - try: - sg = SendGridAPIClient(SENDGRID_KEY) - response = sg.send(message) - logging.info("{0} - Report successfully sent via email.".format(response.status_code)) - except Exception as e: - logging.info(e.message) + resource_list = context["task_instance"].xcom_pull(task_ids="check_empty_data_dictionary") + if resource_list: + message = Mail( + from_email=MAIL_FROM, + to_emails=[To("{0}".format(recipient)) for recipient in MAIL_TO.split(',')], + subject="Empty data dictionary report.", + html_content= HTML_report_generate(resource_list)) + try: + sg = SendGridAPIClient(SENDGRID_KEY) + response = sg.send(message) + logging.info("{0} - Report successfully sent via email.".format(response.status_code)) + except Exception as e: + logging.info(e.message) check_empty_data_dictionary_task = PythonOperator( - task_id='check_empty_data_dictionary', + task_id="check_empty_data_dictionary", provide_context=True, python_callable=check_empty_data_dictionary, dag=dag, ) dispatch_email_task = PythonOperator( - task_id='dispatch_email', + task_id="dispatch_email", provide_context=True, python_callable=dispatch_email, dag=dag, diff --git a/requirements.txt b/requirements.txt index 49c879a..772526d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -103,6 +103,6 @@ werkzeug==0.16.1 # via apache-airflow, flask, flask-caching, flask-jwt- wtforms==2.3.1 # via flask-admin, flask-wtf zipp==3.1.0 # via importlib-metadata zope.deprecation==4.4.0 # via apache-airflow - +sendgrid==6.9.4 # via sendgrid # The following packages are considered to be unsafe in a requirements file: # setuptools