A simple package for sending ETL pipeline notifications to Slack.
Using the MailGun API requires intalling the requestions library as well.
Package can be installed through pip.
pip install job-notificaitons
The repo currently supports two ways to send notifications: 1) via MailGun API or 2) via Google SMTP email. Whichever method used needs to be declared when instantiating the package (see "Getting Started" below). Both methods require credentials. Credentials can be passed at time of instantiation or stored in a .env file. For security reasons, storing credentials in a .env file is teh recommended method.
Below are the credentials needed for using the MailGun API to be stored in an .evn file.
MG_URL=
MG_KEY=
Below are the credentials needed for using the MailGun API to be stored in an .evn file.
GMAIL_USER=
GMAIL_PASS=
Here are come additional useful variables to store in your project's environment.
JOB_NAME=
TO_ADDRESS=
FROM_ADDRESS=
EXCEPTIONS_LOG_FILE=
If these above variables are in a projects .env file, they will be used when sending notifications. For one off emails with the
Notifications.email()
method, the to_address
and from_address
can be overwritten. The EXCEPTIONS_LOG_FILE
is necessary if you want a specific path to create this file. If this variable is not set, an exceptions.log
file will be created at the project's root.
The entry point to the notifications package is through the create_notificaiotns
function. This returns a notifications object. The function requires two arguments:
- the name you want to give the job and
- which mail service you want to use (there are currently two services, “mailgun” and “gmail”).
There is a third optional argument where you can pass the location of a log file to be attached to the notification message.
from job_notifications import create_notifications
notifications = create_notifications('Some Project', "mailgun", logs='/path/to/some/log.file')
Typically with ELT jobs at KIPP Nor Cal, we use the notifications in a try/except block under the if __name__ == "__main__":
block:
if __name__ == '__main__':
try:
main()
notifications.notify()
except Exception as e:
notifications.notify(error_message="Uh-oh!")
Importing the built-in traceback module can give better error messages than "Uh-oh!":
if __name__ == '__main__':
try:
main()
notifications.notify()
except Exception as e:
stack_trace = traceback.format_exc()
notifications.notify(error_message=stack_trace)
Here are some extra features that are nice to have.
If there is a common error occurring that you would like to capture without crashing the ETL job, the function where the error occurs can be decorated with a @handled_exception
decorator.
This decorator will take the exception you expect to catch as an argument or a tuple of exceptions if you are catching more than one.
from job_notifications import handled_exception
@handled_exception(ValueError)
def multiples_of_three(x):
if x % 3 != 0:
raise ValueError("Not a multiple of three!")
else:
logging.info(f"{x} is a multiple of three!")
If any exceptions are caught, the Slack message will have "Succeeded with Warnings" as the subject line, and the message will have a log attached listing all of teh exceptions caught and where.
If there is a need to have a decorated function return None when an exceptions is handled, set return_none
to True. This is useful when a function is calling other functions from third party packages that might raise an exception.
from job_notifications import handled_exception
@handled_exception(ValueError, return_none=True)
def some_func(x):
y = this_func_raises_an_error(x)
return y # This will return None
If there is a need to only log the exception and not handle the exception, set re_raise
to True. This could be useful if another part of the code is handling the exception, but you just want to log that the exception was raised.
from job_notifications import handled_exception
@handled_exception((ValueError, KeyError), re_raise=True)
def some_func(x):
y = this_func_raises_an_error(x)
return y
If only certain exceptions need to be re-raised, then pass a list of the exceptions to re-raise. The below example will handle a ValueError
or a KeyError
, but only the ValueError
will be re-raised.
from job_notifications import handled_exception
@handled_exception((ValueError, KeyError), re_raise=[ValueError])
def some_func(x):
y = this_func_raises_an_error(x)
return y
Want to log how fast a function is?
from job_notifications import timer
@timer()
def do_nothing():
logging.info("I'm doing nothing")
sleep(20)
It's a fairly basic decorator that will log the names of the module, the function and the time it took for the function to finish. Sometimes the names of the modules and functions can be unpleasant to read. These can be replaced by passing a string:
@timer("This function does nothing")
def do_nothing():
logging.info("I'm doing nothing")
sleep(20)