• 27 May 2020
  • Marcin Okuniewski
  • Airflow

From out of the box to totally custom 

In project, where I’m partially responsible for integration airflow with other components, was assigned to me simple at first glance task “Sending custom emails with tasks status”. It seems pretty easy because Airflow already have  built in mechanism for such task:

Out of the box 

Simply you have to set up your smtp server in email section of  Airflow configuration file(default airflow.cfg) : 

[email]
email_backend = airflow.utils.email.send_email_smtp

[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = <your smtp host for ex.smtp.gmail.com>
smtp_starttls = False
smtp_ssl = False
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
# smtp_user = airflow
# smtp_password = airflow
smtp_port = 25
smtp_mail_from = <ex. sender@fromdomain.com>

If you would like to receive just generic email  information about errors or task retries  in your  workflow you just add to your dag definition email_on_failure or email_on_retry respectively. In bellow example in case of failure of ‘hello_task’ airflow will send email with notification.

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

def print_hello():
    return 'Hello world!'

dag = DAG('hello_world', description='Simple tutorial DAG',
          schedule_interval='0 12 * * *',
          start_date=datetime(2017, 3, 20))

dummy_operator = DummyOperator(task_id='dummy_task',
                              retries=3,
                              email=<'receiver@host.com' or list(adreses)>
                              email_on_retrie=True,
                              dag=dag)

hello_operator = PythonOperator(task_id='hello_task',
                                email=<'receiver@host.com' or list(adreses)>
                                email_on_failure=True,
                                python_callable=print_hello,
                                dag=dag)             

dummy_operator >> hello_operator

If you would like to receive emails in case of failure of any of the task you could pass mail parameters in defalut_arg dictionary as follows: 

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
args ={
'owner': 'Airflow',
'description': 'Simple tutorial DAG',
'start_date': datetime(2017, 3, 20)
'email': ['receiver@host.com', 'receiver2@host.com']
'email_on_failure':True
}

def print_hello():
    return 'Hello world!'

Generic message could be modified we are going back to airflow config file and add: 

i email section:

[email]
email_backend = airflow.utils.email.send_email_smtp

subject_template = /path/to/my_subject_template_file
html_content_template = /path/to/my_html_content_template_file

  where example  template could  looks like (in airflow jinja is default template engine):

Try {{try_number}} out of {{max_tries + 1}}<br>
Exception:<br>{{exception_html}}<br>
Log: <a href="{{ti.log_url}}">Link</a><br>
Host: {{ti.hostname}}<br>
Log file: {{ti.log_filepath}}<br>
Mark success: <a href="{{ti.mark_success_url}}">Link</a><br>

(all dynamic fields are taken from ‘context’ dictionary) 

How to succeed 

After couple of more troublesome workflow mailbox could look a little bit sad and depressing What about If there is need to get information  about successfully ended tasks? By  default there no mechanism for success emails like email_on_failure or email_on_retry. But we could use on_success_callback and  on_failure_callback     

These two methods will trigger any custom python function and pass context dictionary as a argument. Let’s take a look on example:

from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.email import send_email

def notify_email(kwargs):
    """Send custom email alerts."""
    ti = kwargs['ti']
    dag_run = kwargs['dag_run']
    var = kwargs['var']['json']
    params = kwargs['params']
    recipient_emails=['receiver@host.com', 'receiver2@host.com']
   

    logs_link = '{}/log?task_id={}&dag_id={}&execution_date={}'.format(conf.get("webserver", "base_url"), ti.task_id, ti.dag_id, ti.execution_date)
   

    title = ''
    body = ''

    # email title.
    if if dag_run._state == State.SUCCESS:
        title = f"Airflow alert: {dag_run.dag_id} succeed"
        body = """
            Hi Everyone, <br>
            <br>
            Job {dag_id} is a great success.<br>
            <br>
            Forever yours,<br>
            Airflow bot <br>
            """.format(dag_id=dag_run.dag_id)
    else:
        if ti.state == State.FAILED:
            title = f"Airflow alert: {dag_run.dag_id} failed on {ti.task_id}"
            body = """
            Hi Everyone, <br>
            <br>
            Task {task_id} failed.<br>
            Check what goes wrong {log_link}<br>
            <br>
            Forever yours,<br>
            Airflow bot <br>
            """.format(task_id=ti.task_id,log_link=log_link )
        else: raise AirflowException('{} task state is not supported in email notifications'.format(ti.state))

    send_email(recipient_emails, title, body)

def print_hello():
    return 'Hello world!'

dag = DAG('hello_world', description='Simple tutorial DAG',
          schedule_interval='0 12 * * *',
          on_success_callback=notify_email
          start_date=datetime(2017, 3, 20))


hello_operator = PythonOperator(task_id='hello_task',
                                on_failure_callback=notify_email,
                                python_callable=print_hello,
                                dag=dag)             

hello_operator

TODO description of all features used 

And if you need specify various senders for different workflows we going on step further with customization we have to use instead airflow built in mechanism python libraries. 

from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import smtplib


def notify_email(kwargs):
    """Send custom email alerts."""
    ti = kwargs['ti']
    dag_run = kwargs['dag_run']
    var = kwargs['var']['json']
    params = kwargs['params']
    recipient_emails=['receiver@host.com', 'receiver2@host.com']
    sender_email =' airflow bot <airflow@example.com'

    logs_link = '{}/log?task_id={}&dag_id={}&execution_date={}'.format(conf.get("webserver", "base_url"), ti.task_id, ti.dag_id, ti.execution_date)
   

    title = ''
    body = ''


    if if dag_run._state == State.SUCCESS:
        title = f"Airflow alert: {dag_run.dag_id} succeed"
        body = """
            Hi Everyone, <br>
            <br>
            Job {dag_id} is a great success.<br>
            <br>
            Forever yours,<br>
            Airflow bot <br>
            """.format(dag_id=dag_run.dag_id)
    else:
        if ti.state == State.FAILED:
            title = f"Airflow alert: {dag_run.dag_id} failed on {ti.task_id}"
            body = """
            Hi Everyone, <br>
            <br>
            Task {task_id} failed.<br>
            Check what goes wrong {log_link}<br>
            <br>
            Forever yours,<br>
            Airflow bot <br>
            """.format(task_id=ti.task_id,log_link=log_link )
        else: raise AirflowException('{} task state is not supported in email notifications'.format(ti.state))
  message = MIMEMultipart()
    message["Subject"] = title
    message["From"] = sender_email
    message.attach(MIMEText(body, "html"))
    with smtplib.SMTP('smtp.host.com', 25) as server:
        server.sendmail(sender_email,recipient_emails, message.as_string())
 

def print_hello():
    return 'Hello world!'

dag = DAG('hello_world', description='Simple tutorial DAG',
          schedule_interval='0 12 * * *',
          on_success_callback=notify_email
          start_date=datetime(2017, 3, 20))


hello_operator = PythonOperator(task_id='hello_task',
                                on_failure_callback=notify_email,
                                python_callable=print_hello,
                                dag=dag)             

hello_operator

TODO description