Skip to content

Commit

Permalink
[dag][reporting] cleanup and improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
sagargg committed Jan 27, 2022
1 parent 44134d0 commit 66bccff
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 27 deletions.
51 changes: 25 additions & 26 deletions aircan/dags/ng_data_dictionary_reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from airflow.utils.dates import days_ago
from airflow.operators.email_operator import EmailOperator
from sendgrid import SendGridAPIClient
from sendgrid.helpers.mail import Mail
from sendgrid.helpers.mail import Mail, To
from string import Template
from time import sleep

Expand All @@ -22,24 +22,23 @@
MAIL_TO = Variable.get("NG_MAIL_TO")

args = {
'start_date': days_ago(0)
"start_date": days_ago(0)
}

dag = DAG(
dag_id='ng_data_example_logs',
dag_id="ng_data_example_logs",
default_args=args,
schedule_interval='0 0 * * *'
schedule_interval="0 0 * * 0"
)

def get_all_datastore_tables():
try:
logging.info("Retriving all datastore tables.")
response = requests.get(
urljoin(CKAN_URL, '/api/3/action/datastore_search?resource_id=_table_metadata')
)
response = requests.get(urljoin(CKAN_URL, "/api/3/action/datastore_search?resource_id=_table_metadata"),
headers = {"User-Agent": "ckan-others/latest (internal API call from airflow dag)"})
response.raise_for_status()
if response.status_code == 200:
return [resource['name'] for resource in response.json()['result']['records']]
return [resource["name"] for resource in response.json()["result"]["records"]]

except requests.exceptions.HTTPError as e:
return e.response.text
Expand All @@ -49,12 +48,12 @@ def get_data_dictionary(res_id):
try:
logging.info("Retriving fields infomation for {res_id}".format(res_id=res_id))
response = requests.get(
urljoin(CKAN_URL,
'/api/3/action/datastore_search?resource_id={0}&limit=0'.format(res_id))
urljoin(CKAN_URL, "/api/3/action/datastore_search?resource_id={0}&limit=0".format(res_id)),
headers={"User-Agent": "ckan-others/latest (internal API call from airflow dag)"}
)
response.raise_for_status()
if response.status_code == 200:
return response.json()['result']['fields']
return response.json()["result"]["fields"]

except requests.exceptions.HTTPError as e:
logging.error("Failed to get fields infomation for {res_id}".format(res_id=res_id))
Expand All @@ -63,7 +62,6 @@ def get_data_dictionary(res_id):

def check_empty_data_dictionary(ds, **kwargs):
all_datastore_tables = get_all_datastore_tables()
all_datastore_tables = all_datastore_tables[:30]
logging.info("Retriving all datastore tables.")
report = []
for res_id in all_datastore_tables:
Expand Down Expand Up @@ -117,29 +115,30 @@ def HTML_report_generate(resource_list):
return Template(html).safe_substitute(table_row=table_row)

def dispatch_email(**context):
resource_list = context['task_instance'].xcom_pull(task_ids='check_empty_data_dictionary')
message = Mail(
from_email=MAIL_FROM,
to_emails=tuple(MAIL_TO.split(',')),
subject='Empty data dictionary report.',
html_content= HTML_report_generate(resource_list))
try:
sg = SendGridAPIClient(SENDGRID_KEY)
response = sg.send(message)
logging.info("{0} - Report successfully sent via email.".format(response.status_code))
except Exception as e:
logging.info(e.message)
resource_list = context["task_instance"].xcom_pull(task_ids="check_empty_data_dictionary")
if resource_list:
message = Mail(
from_email=MAIL_FROM,
to_emails=[To("{0}".format(recipient)) for recipient in MAIL_TO.split(',')],
subject="Empty data dictionary report.",
html_content= HTML_report_generate(resource_list))
try:
sg = SendGridAPIClient(SENDGRID_KEY)
response = sg.send(message)
logging.info("{0} - Report successfully sent via email.".format(response.status_code))
except Exception as e:
logging.info(e.message)


check_empty_data_dictionary_task = PythonOperator(
task_id='check_empty_data_dictionary',
task_id="check_empty_data_dictionary",
provide_context=True,
python_callable=check_empty_data_dictionary,
dag=dag,
)

dispatch_email_task = PythonOperator(
task_id='dispatch_email',
task_id="dispatch_email",
provide_context=True,
python_callable=dispatch_email,
dag=dag,
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,6 @@ werkzeug==0.16.1 # via apache-airflow, flask, flask-caching, flask-jwt-
wtforms==2.3.1 # via flask-admin, flask-wtf
zipp==3.1.0 # via importlib-metadata
zope.deprecation==4.4.0 # via apache-airflow

sendgrid==6.9.4 # via sendgrid
# The following packages are considered to be unsafe in a requirements file:
# setuptools

0 comments on commit 66bccff

Please sign in to comment.