-
27 May 2020
- Airflow
From out of the box to totally custom
In project, where I’m partially responsible for integration Airflow with other components, was assigned to me simple at first glance task “Sending custom emails with tasks status”. It seems pretty easy because Airflow already have built in mechanism for such task:
Out of the box
Simply you have to set up your smtp server in email section of Airflow configuration file(default airflow.cfg) :
[email]
email_backend = airflow.utils.email.send_email_smtp
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = <your smtp host for ex.smtp.gmail.com>
smtp_starttls = False
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
# smtp_user = airflow
# smtp_password = airflow
smtp_port = 25
smtp_mail_from = <ex. [email protected]>
If you would like to receive just generic email information about errors or task retries in your workflow you just add to your dag definition email_on_failure or email_on_retry respectively. In bellow example in case of failure of ‘hello_task’ airflow will send email with notification.
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
def print_hello():
return 'Hello world!'
dag = DAG('hello_world', description='Simple tutorial DAG',
schedule_interval='0 12 * * *',
start_date=datetime(2017, 3, 20))
dummy_operator = DummyOperator(task_id='dummy_task',
retries=3,
email=<'[email protected]' or list(adreses)>
email_on_retrie=True,
dag=dag)
hello_operator = PythonOperator(task_id='hello_task',
email=<'[email protected]' or list(adreses)>
email_on_failure=True,
python_callable=print_hello,
dag=dag)
dummy_operator >> hello_operator
If you would like to receive emails in case of failure of any of the task you could pass mail parameters in defalut_arg dictionary as follows:
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
args ={
'owner': 'Airflow',
'description': 'Simple tutorial DAG',
'start_date': datetime(2017, 3, 20)
'email': ['[email protected]', '[email protected]']
'email_on_failure':True
}
def print_hello():
return 'Hello world!'
Generic message could be modified we are going back to airflow config file and add:
in email section:
[email]
email_backend = airflow.utils.email.send_email_smtp
subject_template = /path/to/my_subject_template_file
html_content_template = /path/to/my_html_content_template_file
where example template could looks like (in airflow jinja is default template engine):
Try {{try_number}} out of {{max_tries + 1}}<br>
Exception:<br>{{exception_html}}<br>
Log: <a href="{{ti.log_url}}">Link</a><br>
Host: {{ti.hostname}}<br>
Log file: {{ti.log_filepath}}<br>
Mark success: <a href="{{ti.mark_success_url}}">Link</a><br>
(all dynamic fields are taken from ‘context’ dictionary)
How to succeed
After couple of more troublesome workflow mailbox could look a little bit sad and depressing What about If there is need to get information about successfully ended tasks? By default there no mechanism for success emails like email_on_failure or email_on_retry. But we could use on_success_callback and on_failure_callback
How Airflow can help your business? Visit our Data Pipeline Automation page and find solution suited your needs
These two methods will trigger any custom python function and pass context dictionary as a argument. Let’s take a look on example:
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.email import send_email
def notify_email(kwargs):
"""Send custom email alerts."""
ti = kwargs['ti']
dag_run = kwargs['dag_run']
var = kwargs['var']['json']
params = kwargs['params']
recipient_emails=['[email protected]', '[email protected]']
logs_link = '{}/log?task_id={}&dag_id={}&execution_date={}'.format(conf.get("webserver", "base_url"), ti.task_id, ti.dag_id, ti.execution_date)
title = ''
body = ''
# email title.
if if dag_run._state == State.SUCCESS:
title = f"Airflow alert: {dag_run.dag_id} succeed"
body = """
Hi Everyone, <br>
<br>
Job {dag_id} is a great success.<br>
<br>
Forever yours,<br>
Airflow bot <br>
""".format(dag_id=dag_run.dag_id)
else:
if ti.state == State.FAILED:
title = f"Airflow alert: {dag_run.dag_id} failed on {ti.task_id}"
body = """
Hi Everyone, <br>
<br>
Task {task_id} failed.<br>
Check what goes wrong {log_link}<br>
<br>
Forever yours,<br>
Airflow bot <br>
""".format(task_id=ti.task_id,log_link=log_link )
else: raise AirflowException('{} task state is not supported in email notifications'.format(ti.state))
send_email(recipient_emails, title, body)
def print_hello():
return 'Hello world!'
dag = DAG('hello_world', description='Simple tutorial DAG',
schedule_interval='0 12 * * *',
on_success_callback=notify_email
start_date=datetime(2017, 3, 20))
hello_operator = PythonOperator(task_id='hello_task',
on_failure_callback=notify_email,
python_callable=print_hello,
dag=dag)
hello_operator
TODO description of all features used
And if you need specify various senders for different workflows we going on step further with customization we have to use instead airflow built in mechanism python libraries.
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import smtplib
def notify_email(kwargs):
"""Send custom email alerts."""
ti = kwargs['ti']
dag_run = kwargs['dag_run']
var = kwargs['var']['json']
params = kwargs['params']
recipient_emails=['[email protected]', '[email protected]']
sender_email =' airflow bot <[email protected]'
logs_link = '{}/log?task_id={}&dag_id={}&execution_date={}'.format(conf.get("webserver", "base_url"), ti.task_id, ti.dag_id, ti.execution_date)
title = ''
body = ''
if if dag_run._state == State.SUCCESS:
title = f"Airflow alert: {dag_run.dag_id} succeed"
body = """
Hi Everyone, <br>
<br>
Job {dag_id} is a great success.<br>
<br>
Forever yours,<br>
Airflow bot <br>
""".format(dag_id=dag_run.dag_id)
else:
if ti.state == State.FAILED:
title = f"Airflow alert: {dag_run.dag_id} failed on {ti.task_id}"
body = """
Hi Everyone, <br>
<br>
Task {task_id} failed.<br>
Check what goes wrong {log_link}<br>
<br>
Forever yours,<br>
Airflow bot <br>
""".format(task_id=ti.task_id,log_link=log_link )
else: raise AirflowException('{} task state is not supported in email notifications'.format(ti.state))
message = MIMEMultipart()
message["Subject"] = title
message["From"] = sender_email
message.attach(MIMEText(body, "html"))
with smtplib.SMTP('smtp.host.com', 25) as server:
server.sendmail(sender_email,recipient_emails, message.as_string())
def print_hello():
return 'Hello world!'
dag = DAG('hello_world', description='Simple tutorial DAG',
schedule_interval='0 12 * * *',
on_success_callback=notify_email
start_date=datetime(2017, 3, 20))
hello_operator = PythonOperator(task_id='hello_task',
on_failure_callback=notify_email,
python_callable=print_hello,
dag=dag)
hello_operator
TODO description
Visit our blog for more in-depth articles on Airflow:
Powerful REST API in Airflow 2.0 — what do you need to know?
- Differences between Airflow 1.10.x and 2.0
- Powerful REST API in Airflow 2.0 — what do you need to know?
- How to improve Airlfow 2.0 performance with Smart Sensors?