Share this post

Databricks testing with GitHub Actions

To those who inspired it and will never read it. And for Marcin!

 

Introduction

Databricks Connect (more info here) provides a good way of interacting with the Azure Databricks clusters on your local machine (either using IDE or any custom application). The setup is simple – you provide the URL and personal token generated from the Databricks Workspace and you can start working with the DBFS, clusters etc. This works flawlessly until you want to start running your application code locally, e.g. if you need to use dbutils. For this, you need to generate another personal token with elevated permissions (to be done only in a Databricks notebook) that is valid only for 48 hours. For some basic checks this is more than enough but if you would like to use it in a more automated solution, there is a need to regenerate this token every two days to make it work. As I have been looking forward to run some integration tests for my Databricks application, I needed to find a more robust solution as it was going to execute for my develop branch each time a push was detected.

Need help with databricks? Check our data engineering services and see how we can help you.

 

The idea of integration testing for my app

As my application had more than 50 data feeds, each organized in separate python class covering end-to-end processing, from reading from the source, transformations through save into the final layer into BLOB storage and Azure Synapse table. As there were some business users (power users actually) using the DEV Databricks environment, I wanted to have a way to ensure that each change merged from a feature branch with the develop branch is working properly before it was actually installed into the DEV Databricks.

Originally my GitHub workflow was releasing the develop branch each time it detected a push to an application directory. The idea was to test the develop branch before it was released into the application directory and based on the test result only if success, released into the destination directory.

The new workflow was going to look as follows:

 

  1. Install prerequisites

 

  1. Deploy to the Databricks TEST directory

 

  1. Test dummy load using the TEST directory

 

  1. Deploy to the Databricks destination directory

The only problem was that I still needed this elevated permission token to be able to run my classes locally on the provisioned machine that was executing the workflow.

 

Solution – Databricks Jobs API

Databricks Job API provides the ability to run jobs inside the Databricks Workspace. It can be a custom job or linked to an existing notebook. Once we are able to setup a notebook that executes all the operations we need inside the Databricks, we don’t need to have the elevated permissions token anymore and we are able to setup an automated way to test the feeds.

Let’s consider this simple feed class below. I have created it for this article only so it is missing lots of features and basically all it does, is to read from the source and save it into the BLOB storage (both locations mounted inside the DBFS):

 

from pyspark.sql import SparkSession 

class TestDataFeed(object): 
  """ Dummy class for integration testing """ 

  def __init__(self, dbutils): 
    self._df = None 
    self._ctx = SparkSession.builder.getOrCreate() 
    self.dbutils = dbutils 
    self.source_dir = '/mnt/test_in/integration_testing/srce_dir' 
    self.destination_dir = '/mnt/test_out/integration_testing/dest_dir' 

  def read(self, path=None): 
    path = path or self.source_dir 
    spark = self._ctx 
    opts = dict(format='parquet', compression='snappy') 

    self._df = spark.read.load(path, **opts) 

    return self 

  def transform(self): 
    pass 

  def save2blob(self, path=None): 
    path = path or self.destination_dir 
    opts = dict(mode='overwrite', format='parquet', compression='snappy') 

    self._df.write.save(path, **opts) 

  def run(self): 
    self.read() 
    self.transform() 
    self.save2blob() 

A simple test for this class would only read from the source directory and count the number of records fetched. The method will look like the below:

 

def test_TestDataFeed(): 
    o = TestDataFeed(dbutils) 
    o.read() 
    o.transform() 
    y = o._df.count() 
    assert y>0, "TestDataFeed dummy pull" 

In general tests can be more thorough and check the results of the transformations and if the save into the destination directory and/or ASDW table succeeded but for this case we will complete the test with only this check.

Normally the test method would be placed in the module inside the tests directory aside of the module tested:

•    module_tested.py
—    class TestDataFeed
•    tests/
—    test_module_tested.py
•   test_TestDataFeed()

But to be able to use dbutils, we will need to modify it a little and we need to put it in a Databricks notebook so it can use dbutils.

Instead of creating a notebook for every test function, the notebook will be generic and will be able to read the class name passed as a parameter. Based on the class name it will import it dynamically and execute the test function. It can look like the below example:

 

import sys 
sys.path.insert(0, '/dbfs/your_app_directory_test') 
import datetime 
import importlib 

cls_name = getArgument("class_name") 
pkg_name = getArgument("pkg_name") 

try: 
  if not cls_name or not pkg_name: 
    raise ImportError('Class name or Package name cannot be empty!') 

  d=datetime.datetime.now() 
  ex = None 

  clss = None 
  o = None 

  module = importlib.import_module(pkg_name) 
  clss = getattr(module, cls_name) 
  o = clss() 
  o.read() 
  o.transform() 
  y = o._df.count() 
  assert y>0, cls_name + " dummy pull" 
except Exception as e: 
  ex = e 
finally: 
  dbutils.notebook.exit(ex) 

To execute this notebook we will need a small util that has the ability to call the Databricks Jobs API, which can look like this:

 

def run_notebook( 
    notebook_path, 
    cluster_id, 
    dbr_credentials, 
    base_url, 
    cls_name, 
    pkg_name, 
    ): 

    def_timeout_sec = 180 
    def_check_timeout_sec = 5 

    job_name = 'Integration dummy test' 

    base_parameters = {'class_name': cls_name, 'pkg_name': pkg_name} 

    job_submit_data = { 
        'run_name': job_name, 
        'existing_cluster_id': cluster_id, 
        'timeout_seconds': def_timeout_sec, 
        'notebook_task': {'notebook_path': notebook_path, 
                          'base_parameters': base_parameters}, 
        } 

    run_cmd = requests.post(base_url + 'api/2.0/jobs/runs/submit', 
                            data=json.dumps(job_submit_data), 
                            auth=dbr_credentials) 

    runjson = run_cmd.text 
    d = json.loads(runjson) 
    job_run_id = d['run_id'] 

    print 'Job submitted (Run_id: ' + str(job_run_id) + ')' 

    status_cmd = requests.get(base_url + 'api/2.0/jobs/runs/get?run_id=' 
                               + str(job_run_id), 
                              data=json.dumps(job_submit_data), 
                              auth=dbr_credentials) 
    jobjson = status_cmd.text 
    j = json.loads(jobjson) 
    print 'Run page URL: ' + j['run_page_url'] 

    i = 0 
    job_timed_out = False 
    while not job_timed_out: 
        time.sleep(def_check_timeout_sec) 
        status_cmd = requests.get(base_url 
                                  + 'api/2.0/jobs/runs/get?run_id=' 
                                  + str(job_run_id), 
                                  data=json.dumps(job_submit_data), 
                                  auth=dbr_credentials) 
        jobjson = status_cmd.text 
        j = json.loads(jobjson) 
        job_current_state = j['state']['life_cycle_state'] 
        job_run_id = j['run_id'] 
        if job_current_state in ['TERMINATED', 'INTERNAL_ERROR', 
                                 'SKIPPED']: 
            print 'Job status: ' + job_current_state 
            break 

        # check for 30 minutes max 

        if i >= def_timeout_sec: 
            job_timed_out = True 
            status_cmd = requests.get(base_url 
                    + 'api/2.0/jobs/runs/get-output?run_id=' 
                    + str(job_run_id), 
                    data=json.dumps(job_submit_data), 
                    auth=dbr_credentials) 
            jobjson = status_cmd.text 
            j = json.loads(jobjson) 
            print 'Job status: TIMED OUT' 
            print 'Job takes more than expected. Please consult job log for more information: ' 
            print j['metadata']['run_page_url'] 
            break 
        i = i + 1 

    if not job_timed_out: 
        status_cmd = requests.get(base_url 
                                  + 'api/2.0/jobs/runs/get-output?run_id=' 
                                   + str(job_run_id), 
                                  data=json.dumps(job_submit_data), 
                                  auth=dbr_credentials) 
        jobjson = status_cmd.text 
        j = json.loads(jobjson) 
        job_output = j['notebook_output']['result'] 

        assert job_output == 'None', job_output 
        print 'Job SUCCEEDED'

As this script can be executed locally without special token, I have replaced the test_TestDataFeed method to run run_notebook instead of the actual test. It will go to Databricks and execute for me the notebook that is able to run my tests.

The exempt will look now like below. Keep in mind that I am passing the Databricks token as an env variable, as this is being kept in the GitHub secrets.

 

BASE_URL = '<base_url_of_databricks>' 
NOTEBOOK_PATH = '<notebook_path_of_your_test_notebook>' 
DBR_TOKEN = os.environ.get('DBR_TOKEN') 
DBR_CREDENTIALS = ('token', DBR_TOKEN) 

def test_TestDataFeed(): 
    run_notebook( 
        cluster_id=CLUSTER_ID, 
        dbr_credentials=DBR_CREDENTIALS, 
        base_url=BASE_URL, 
        notebook_path=NOTEBOOK_PATH, 
        cls_name='TestDataFeed', 
        pkg_name='modules.data_feed_test', 
        ) 

The workflow for GitHub action will then execute pytest to run the tests. The workflow should be completed in the following steps:

 

name: Continous Deployment - Databricks 

on: 
  push: 
    branches: [ develop, master ] 

jobs: 
  build: 

    runs-on: ubuntu-latest 

    steps: 
    - uses: actions/checkout@v2 

    - name: Set up Python 3.7 
      uses: actions/setup-python@v2 
      with: 
        python-version: 3.7 

    - name: Install prerequisites 
      run: | 
        pip install -r your_app_directory/requirements.txt 
        cat > ~/.databrickscfg <<EOF 
        [DEFAULT] 
        host = https://eastus2.azuredatabricks.net 
        token = $([ ${GITHUB_REF##*/} == "master" ] && echo "${{ secrets.DB_TOKEN_PROD }}" || echo "${{ secrets.DB_TOKEN_DEV }}") 
        EOF 

    - name: Deploy to the Databricks test instance 
      run: | 
        dbfs rm -r dbfs:/your_app_directory_test 
        dbfs mkdirs dbfs:/your_app_directory_test/logs  
        find . \( ! -regex '.*/\..*' \) -type f -name "*.py" | cut -c 11- $a | xargs -i dbfs cp -r the_app/{} dbfs:/your_app_directory_test/{} --overwrite 

    - name: Test dummy load 
      env: 
        DBR_TOKEN_SECRET_DEV: ${{ secrets.DB_TOKEN_DEV }} 
      run: | 
        export DBR_TOKEN="$DBR_TOKEN_SECRET_DEV" 
        pytest --workers 5 the_app/modules/tests/test_module_tested.py 

    - name: Deploy to the Databricks destination instance 
      run: | 
        find . \( ! -regex '.*/\..*' \) -type f -name "*.py" | cut -c 11- $a | xargs -i dbfs cp -r the_app/{} dbfs:/your_app_directory/{} --overwrite 

Once the test is completed, the output shows a summary on each test:

 

Test dummy load 
======================== 32 passed in 135.50s (0:02:15) ======================== 
Run pytest --workers 5 the_app/modules/tests/test_module_tested.py 
============================= test session starts ============================== 
platform linux -- Python 3.7.8, pytest-6.0.1, py-1.9.0, pluggy-0.13.1 
rootdir: /home/runner/work/<repo_name>/<repo_name> 
plugins: parallel-0.1.0, testmon-1.0.3 
collected 32 items 
pytest-parallel: 5 workers (processes), 1 test per worker (thread) 
................................................................ 
======================== 32 passed in 135.50s (0:02:15) ======================== 

If any of the checks failed, the your_app_directory remains untouched and the developer responsible for the merge needs to check on the exact issue in the pytest logs and fix accordingly. And the app still works!

This is only one of the possible solutions for this problem, and as always the original idea should be only a stub for a broader discussion for any improvements to the procedures described. The tests described cover only the very basic tests that can ensure the application has no major issues like syntax errors, bad directories provided or inaccessible but can be a good base for more complex tests.

Visit our blog for more in-depth Data Engineering articles:

 

Data Engineering

Author

Share this post
Close

Send Feedback