How to improve Airlfow 2.0 performance with Smart Sensors
  • 25 October 2021
  • Mikolaj Klepacz
  • Airflow

Intro to Smart Sensors

With the new Airflow version 2.0 many new features have been added to this powerful tool from Apache. One of the updates was the introduction of Smart Sensors, which can drastically improve the Airflow performance if you are using multiple sensors in your dags. In this article, we’ll compare regular and Smart sensors and show you how to configure your first Smart Sensor.

Sensors in Airflow

Sensor is one of the Airflow operator types whose purpose is to basically wait for something to happen. If your process is event-driven, which means that the workflow needs to be started only when some event occurs, you should use Apache Airflow sensors. Typical use cases of sensors: waiting for a file, delaying dag execution, checking if SQL entry has appeared, waiting for other dags to finish.

Example usage of Sensors is presented in the dag below:

Airflow 2.0 sensor - 1

Here we are using FileSensor to create four tasks that are supposed to wait for creation of certain files. Each task wait_for_file_{id} constantly checks whether a file in the specified path has been created and succeeds when it finds the file. In default, the Airflow dag sensor checks if the file exists every 60 seconds, but this can be configured using poke_interval parameter when creating the sensor. 

In the example DAG, process_files task gets triggered only after all the files specified in the sensors are found.

Here is the definition of the mentioned dag example:

 

# dags/sensor.py

from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.sensors.filesystem import FileSensor
from airflow.operators.python_operator import PythonOperator

import logging

default_args = {
    "start_date": days_ago(1),
}

dir_path = "/usr/local/airflow/test_data"
file_names = [
    "file1.txt",
    "file2.txt",
    "file3.txt",
    "file4.txt",
]

def _process_files():
    logging.info("Following files detected: ")

    for name in file_names:
        logging.info(f"{dir_path}/{name}")

    logging.info("Process files...")

def _save_data():
    logging.info("save data")

with DAG("sensor_example", default_args=default_args, catchup=False) as dag:

    sensors = [
        FileSensor(
            task_id=f"wait_for_file_{sensor_id}",
            filepath=f"{dir_path}/{name}",
            fs_conn_id="fs_default"
        ) for sensor_id, name in enumerate(file_names, start=1)
    ]

    process = PythonOperator(
        task_id="process_files",
        python_callable=_process_files
    )

    save = PythonOperator(
        task_id="save",
        python_callable=_save_data
    )

    sensors >> process >> save

 

Problems with regular sensors

As mentioned before the sensor operator is very useful if we want to wait for some event to occur in our workflow. But what about the case when we need tens or even hundreds of sensors in our airflow instance? It turns out that having a large number of regular sensors can cause the following issues:

  • Risk of a deadlock – sensors are usually long-running tasks and there is a finite number of worker slots that can be run at once. That is why you may end up using all slots for running the sensors, therefore no other tasks could be triggered anymore, so all your dags can get stuck.
  • Underutilisation of the resources – with the regular sensors each task uses one process which is inefficient, as we are reserving the slots for tasks that are not computationally heavy. In result, our other tasks might be still waiting to be scheduled even though the utilisation of our machine is very low.

To face the mentioned issues, a new version of sensor operator was introduced in Airflow 2.0: Smart Sensor.

What are Smart Sensors, and when should we use them?

The main idea of the Smart Sensor service is to stop assigning one process for each sensor task, because this approach is not efficient. Instead, centralized processes are used to execute those long-running tasks in batches.

To achieve that, the execution of the task is split into two steps:

  1. Registration of the task in Smart Sensor service, and storing the task information in Airflow Metastore DB. When the registration succeeds, the task frees up the worker slot.
  2. Use of a few centralized processes to execute the serialized tasks. Those centralized tasks are stored in the special built-in dags (named smart_sensor_group_shard_xx)

The smart sensors were designed to drastically improve the efficiency of these long-running tasks, which can lead to large savings in the infrastructure costs. You should start using them if your dags contain tens or hundreds of sensors and cause significant underutilization of the resources.

Airflow 2.0 sensor - 4

Registering tasks in smart sensor service, source: https://airflow.apache.org/docs/apache-airflow/stable/concepts/sensors.html

Configuring Smart Sensors in Airflow 2.0

To configure the smart sensor service in Airflow, you first need to enable it in the configuration. Here is the example part of the airflow.cfg which enables the smart sensors.


[smart_sensor]
use_smart_sensor = true
shard_code_upper_limit = 10000

#Users can change the following config based on their requirements
shard = 2

sensors_enabled = NamedHivePartitionSensor,SmartFileSensor

 

Explanation of the configs:

  • use_smart_sensor: Indicates if the smart sensor service is enabled
  • shards: Number of concurrently running smart sensor jobs for the airflow cluster.
  • sensors_enabled: List of sensor class names using the smart sensor.

The next step of the configuration is to upgrade the Airflow database to add the sensor_instance table, which is needed for smart sensors to work. To do that, just run the following command:

airflow db upgrade

Now after restarting airflow you should see 2 new DAGs created automatically:

Airflow 2.0 senseo - 6

Those are the actual smart sensor workers, which pick up and execute the sensor tasks. For smart sensors to work, they need to be turned on manually.

Finally, we can set up our smart sensor. It’s a modified version of the regular sensor that was presented above


First create a new file: $AIRFLOW_HOME/plugins/smart_file_sensor.py which contains the definition of SmartFileSensor. It is a regular file sensor but with necessary additions that are needed for it to work as a smart sensor. File contents are presented below:

# plugins/smart_file_sensor.py

from airflow.sensors.filesystem import FileSensor
from airflow.utils.decorators import apply_defaults
from typing import Any

class SmartFileSensor(FileSensor):
    poke_context_fields = ('filepath', 'fs_conn_id')

    @apply_defaults
    def __init__(self,  **kwargs: Any):
        super().__init__(**kwargs)

    def is_smart_sensor_compatible(self):
        result = (
            not self.soft_fail
            and super().is_smart_sensor_compatible()
        )
        return result

 

Note the poke_context_fields variable. It contains the set of arguments that are expected when creating the sensor.

Then in the sensor.py (example dag with regular sensor from above) add the following import:

from smart_file_sensor import SmartFileSensor

and change the FileSensor to SmartFileSensor:

 sensors = [
        SmartFileSensor(
            task_id=f"wait_for_file_{sensor_id}",
            filepath=f"{dir_path}/{name}",
            fs_conn_id="fs_default"
        ) for sensor_id, name in enumerate(file_names, start=1)
    ]

 

Now the defined sensor tasks should be working as smart sensors.

Note that now when the DAG gets triggered the sensor tasks’ status is set as sensing  until the conditions defined in the sensors are met, whereas the tasks in smart_sensor_group_shard_xx  dags are in running state.

Airflow 2.0 sensor - 9

Conclusion

The smart sensor service is one of the major features introduced in Apache Airflow 2.0. It is used to improve the performance of the sensors by consolidating those smal,l light-weight tasks into a large single process. It should be used when a large number of sensor tasks are used on a single Airflow instance to improve the utilisation of computing resources.

Check out our blog for more details on Airflow: