-
15 November 2024
- Data Pipeline
Introduction
With data ecosystems constantly growing in size and complexity, a need for resillient data pipelines with fast time to market is becoming more and more important. This encourages companies to reach for the new methodologies for their ETL processes such as declarative pipelines, which allow developers to focus on defining what the desired state of data should look like without worrying about the task execution order.
With the release of Delta Live Tables, Databricks aims to apply this approach on its platform and provide a data processing solution that handles transformation logic for you along with some other useful capabilities such as automated data quality checks, monitoring and observability.
Declarative approach to data pipelines
The usage of declarative approach isn’t unique to data engineering. In DevOps, tools like Kubernetes and Terraform have revolutionized deployment practices by introducing the concept of Infrastructure as Code, which allows engineers to define the desired state of infrastructure and environments rather than specifying each operational step. Similarly, declarative pipelines let engineers define the final state of the database, leaving the operational activities to the underlying system.
Traditionally, ETL pipelines are created by explicitly specifying the order of task execution focusing on the relationships between the tasks to make sure that the results are consistent, without missing required data and avoiding duplicates.
Typical process can be defined in following steps:
- Download the files from the storage.
- Load the data to the staging tables.
- Perform transformations on the data, combining data from different sources.
- Load to the final destination and perform data quality checks.
Although this imperative approach is flexible and well known, it might become difficult to scale and maintain as the pipeline grows in complexity, because when the steps are explicitly defined the developer has to reflect the data lineage in the pipeline manually therefore it can be challenging to ensure the best performance with parallelisation, scaling and with minimal costs.
Declarative pipelines on the other hand don’t require explicit specification of the transformation steps order. Pipeline author specifies how the final data state should look like, leaving the pipeline engine to automatically determine the actual steps to achieve it, i.e. the order of transformations, scheduling tasks, handling dependencies, resource allocation and optimization. In theory, this approach seems to be a great option, because it brings scalability and simplicity to increasingly complex data solutions.
This method may be a great option in many use cases, but when deciding whether we should use the declarative framework we need to consider the relative lack of flexibility, as those solutions abstract away operational details, which can make it harder to debug and troubleshoot issues. Also vendor lock-in might be an issue because of proprietary syntax or platform-specific integrations.
Introducing Delta Live Tables
Delta Live Tables (DLT) is a declarative ETL framework for building scalable and reliable data processing pipelines. It lets users focus on the transformations and desired data structures, while automatically managing orchestration, compute infrastructure, data quality and error handling.
What is the difference between Delta Live Tables and Delta tables? Delta table is a default data table format in Databricks, which basically extends parquet file format allowing to perform ACID transactions. Delta Live Tables however lets you describe how the data flows between those tables by creating them and managing updates.
DLT uses following datasets to maintain results of declarative queries:
- Streaming table – Delta table with support of streaming and incremental data processing. It is always defined against a data source that is continuously or incrementally growing. Those tables are designed for append-only data sources and natively support Databricks Autoloader which enables efficient ingestion of cloud files.
- View – logical dataset that is computed every time it’s queried. It’s useful for dividing large statements into easier-to-manage queries. Delta Live Tables don’t publish views outside the pipeline so they can be useful as intermediate queries that shouldn’t be exposed to the users.
- Materialized view – view with precomputed results. Those datasets are refreshed each time the pipeline updates to reflect changes in the upstream datasets that might have been changed. Materialized views are useful when multiple downstream queries use them as they don’t need to be computed every time when queried, unlike regular views.
Delta Live Tables use those datasets to define a pipeline, which is a main unit to run data processing workflows. A pipeline is generally defined in a source file (notebook or python file) that contains all definitions of streaming tables and materialized views that are declared in SQL or in python. After creating the pipeline code with its logic we need to create the pipeline itself by providing its configuration. Although most settings are optional and the platform can proceed with the defaults, we need to consider the target schema if we want to publish the data, because in default DTL pipelines don’t output any tables to Hive metastore or Unity Catalog.
Some key features of the Delta Live Tables framework are listed below:
- Automated dependency management and pipeline visualization – DLT determines dependencies across the pipeline and renders a data flow graph to visualize data lineage while checking for syntax errors.
- Data quality enforcement – we can specify expectations to control data accuracy while providing flexibility with the method of processing the invalid records. DLT can fail the whole pipeline, omit the erroneous rows or write the invalid records while reporting the failure in the dataset metrics.
- Incremental and streaming data processing – DLT supports Autoloader, which allows to stream the cloud files in real-time without specifying additional cloud services and reducing the amount of data being processed in comparison with batch data reloads.
- Built-in monitoring and observability – provides monitoring and logging capabilities. We can track the runtime statistics such as records processed, used runtime, observe pipeline health and monitor data quality trends.
DLT pipeline example
Pipeline description
In this chapter a sample Delta Live Tables pipeline will be created to demonstrate the capabilities of the framework. This example shows the pipeline that uses data provided by Databricks available in the /databricks-datasets on DBFS, specifically retail-org dataset.
The process will read sales orders data, join with the table containing product information and retrieve the list of top selling product categories. We’ll demonstrate how to read CSV and JSON data to a bronze layer table, read raw data and use DLT expectation to write cleansed data to the silver layer and finally write aggregated data to the gold layer table.
Requirements to run the workflow on your Databricks workspace:
- cluster creation permission – DLT runtime creates cluster before running pipeline, so this pipeline will fail if you don’t have such permissions in your workspace
- access to read/write data in hive metastore – pipeline will output data as tables in hive metastore
- make sure your workspace has Delta Live Tables enabled as some older workspaces without premium pricing tier might not have this feature available
Steps for pipeline creation
First, create a Databricks SQL notebook and include the following code adding each SQL snippet to a separate notebook cell.
Here we specify the bronze layer corresponding to the data ingestion, which uses Autoloader capabilities to incrementally load new data as it arrives. This creates streaming tables for sales orders and products. The data is read straight from csv/json files and is made available as tables for other pipeline steps.
CREATE OR REFRESH STREAMING TABLE sales_orders COMMENT "List of sales, ingested from /databricks-datasets." TBLPROPERTIES ("dataLayer" = "bronze") AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders/", "json", map("cloudFiles.inferColumnTypes", "true")); CREATE OR REFRESH STREAMING TABLE products COMMENT "List of products, ingested from /databricks-datasets." TBLPROPERTIES ("dataLayer" = "bronze") AS SELECT * FROM read_files("/databricks-datasets/retail-org/products/", "csv", map("sep", ";"))
In this step, we combine sales data with products to create a sales table with an added product category column. Notice the LIVE. prefix before each DLT table reference, this syntax indicates the reference to other DLT tables available within the pipeline. We also applied a data quality check which will stop pipeline execution if there is a product_id that exists in sales_orders table but not in products.
CREATE OR REFRESH STREAMING TABLE product_sales_categorised( CONSTRAINT valid_products EXPECT (mapping_product_id IS NOT NULL) ON VIOLATION FAIL UPDATE ) PARTITIONED BY (product_category) COMMENT "Product sales with product category" TBLPROPERTIES ("dataLayer" = "silver") AS SELECT product_details.id AS product_id, product_details.curr AS currency, product_details.name AS product_name, product_details.price AS product_price, product_details.qty AS quantity, product_details.unit AS product_unit, product_category, product_id AS mapping_product_id FROM( SELECT explode(ordered_products) AS product_details FROM STREAM(LIVE.sales_orders) ) s LEFT JOIN LIVE.products p ON s.product_details.id = p.product_id
Finally, we create a gold table which aggregates product data by category and filters only USD currency.
CREATE OR REFRESH MATERIALIZED VIEW top_products COMMENT "List of top selling products in USD" TBLPROPERTIES ("dataLayer" = "gold") AS SELECT product_category, format_number(SUM(product_sales), 'USD ,###') AS total_sales FROM ( SELECT product_category, product_price * quantity AS product_sales FROM LIVE.product_sales_categorised WHERE currency = 'USD' ) GROUP BY product_category ORDER BY SUM(product_sales) DESC
Note: Delta Live tables are not designed to run interactively in notebooks. Executing a cell with DLT commands might only check for syntax errors. To run queries you must configure the notebook as a part of the pipeline.
Now when we have our notebook ready we can go ahead and create the pipeline:
- Go to Delta Live Tables in the sidebar and click Create Pipeline.
- Provide pipeline details:
- Name
- Product edition: Advanced, needed for Data Quality
- Path to the newly created notebook
- Target schema – put any schema you have access to, here we are using default on Hive metastore
- (optional) compute details.
- Click create.
- Run the pipeline by clicking the Start button in the top panel.
Pipeline execution and results
Once we run the pipeline we can see the execution graph with all defined datasets. Here we can see the advantage of the declarative approach, because we didn’t explicitly say that the product_sales_category should run before products and sales_orders, but the execution engine figured it out for us.
When we click on one of the pipeline steps we can see details of the dataset. From here we can go straight to the data catalog to see how the data looks like, which wouldn’t be an option if we didn’t specify the target schema when defining the pipeline.
We can also check our DQ expectation defined on the product_sales_categorised table. There are no failures because all product ids from sales_orders were available in the product table.
We can also go back to our notebook and run the pipeline from there once it’s coupled with the pipeline. It allows easier pipeline modifications and debugging because we don’t have to go back and forth between notebook and DLT pipeline.
Conclusion
Declarative data pipelines simplify the creation and maintenance of complex workflows, allowing data teams to focus on insights over infrastructure. While there are trade-offs in flexibility and possible vendor lock-in, the benefits of faster deployment, easier maintenance and a focus on the final state of the database make declarative pipelines a powerful tool for modern data solutions.
Delta Live Tables is an example of a declarative approach. It lets users create reliable data processing pipelines using python or SQL. Apart from the automatic task orchestration it provides some other useful features, such as data quality controls, incremental data processing and observability and allows you to leverage the advantages of Databricks platform