diff --git a/aircan/dags/ng_data_dictionary_reporting.py b/aircan/dags/ng_data_dictionary_reporting.py new file mode 100644 index 0000000..93fdd13 --- /dev/null +++ b/aircan/dags/ng_data_dictionary_reporting.py @@ -0,0 +1,147 @@ +import logging +import datetime +import time +from pprint import pprint + +from airflow import DAG +from airflow.exceptions import AirflowException +from airflow.models import Variable +import requests +from urllib.parse import urljoin +from airflow.operators.python_operator import PythonOperator +from airflow.utils.dates import days_ago +from airflow.operators.email_operator import EmailOperator +from sendgrid import SendGridAPIClient +from sendgrid.helpers.mail import Mail, To +from string import Template +from time import sleep + +CKAN_URL = Variable.get("NG_CKAN_URL") +SENDGRID_KEY = Variable.get("NG_SENDGRID_API_KEY") +MAIL_FROM = Variable.get("NG_MAIL_FROM") +MAIL_TO = Variable.get("NG_MAIL_TO") + +args = { + "start_date": days_ago(0) +} + +dag = DAG( + dag_id="ng_data_example_logs", + default_args=args, + schedule_interval="0 0 * * 0" +) + +def get_all_datastore_tables(): + try: + logging.info("Retriving all datastore tables.") + response = requests.get(urljoin(CKAN_URL, "/api/3/action/datastore_search?resource_id=_table_metadata"), + headers = {"User-Agent": "ckan-others/latest (internal API call from airflow dag)"}) + response.raise_for_status() + if response.status_code == 200: + return [resource["name"] for resource in response.json()["result"]["records"]] + + except requests.exceptions.HTTPError as e: + return e.response.text + + +def get_data_dictionary(res_id): + try: + logging.info("Retriving fields infomation for {res_id}".format(res_id=res_id)) + response = requests.get( + urljoin(CKAN_URL, "/api/3/action/datastore_search?resource_id={0}&limit=0".format(res_id)), + headers={"User-Agent": "ckan-others/latest (internal API call from airflow dag)"} + ) + response.raise_for_status() + if response.status_code == 200: + return response.json()["result"]["fields"] + + except requests.exceptions.HTTPError as e: + logging.error("Failed to get fields infomation for {res_id}".format(res_id=res_id)) + return False + + +def check_empty_data_dictionary(ds, **kwargs): + all_datastore_tables = get_all_datastore_tables() + logging.info("Retriving all datastore tables.") + report = [] + for res_id in all_datastore_tables: + # Delay 500ms + sleep(0.5) + fields = get_data_dictionary(res_id) + if(fields): + data_dictionary = ['info' in f for f in fields] + if True not in data_dictionary: + report.append(res_id) + return report + + +def HTML_report_generate(resource_list): + table_row = "" + for resource in resource_list: + table_row += u"
Resource ID | +Data Dictionary | +
---|