• Databricks, Testing
  • Share this post

    Databricks testing with GitHub Actions

    To those who inspired it and will never read it. And for Marcin!

     

    Introduction

    Databricks Connect (more info here) provides a good way of interacting with the Azure Databricks clusters on your local machine (either using IDE or any custom application). The setup is simple – you provide the URL and personal token generated from the Databricks Workspace and you can start working with the DBFS, clusters etc. This works flawlessly until you want to start running your application code locally, e.g. if you need to use dbutils. For this, you need to generate another personal token with elevated permissions (to be done only in a Databricks notebook) that is valid only for 48 hours. For some basic checks this is more than enough but if you would like to use it in a more automated solution, there is a need to regenerate this token every two days to make it work. As I have been looking forward to run some integration tests for my Databricks application, I needed to find a more robust solution as it was going to execute for my develop branch each time a push was detected.

    Need help with databricks? Check our data engineering services and see how we can help you.

     

    The idea of integration testing for my app

    As my application had more than 50 data feeds, each organized in separate python class covering end-to-end processing, from reading from the source, transformations through save into the final layer into BLOB storage and Azure Synapse table. As there were some business users (power users actually) using the DEV Databricks environment, I wanted to have a way to ensure that each change merged from a feature branch with the develop branch is working properly before it was actually installed into the DEV Databricks.

    Originally my GitHub workflow was releasing the develop branch each time it detected a push to an application directory. The idea was to test the develop branch before it was released into the application directory and based on the test result only if success, released into the destination directory.

    The new workflow was going to look as follows:

     

    1. Install prerequisites

     

    1. Deploy to the Databricks TEST directory

     

    1. Test dummy load using the TEST directory

     

    1. Deploy to the Databricks destination directory

    The only problem was that I still needed this elevated permission token to be able to run my classes locally on the provisioned machine that was executing the workflow.

     

    Solution – Databricks Jobs API

    Databricks Job API provides the ability to run jobs inside the Databricks Workspace. It can be a custom job or linked to an existing notebook. Once we are able to setup a notebook that executes all the operations we need inside the Databricks, we don’t need to have the elevated permissions token anymore and we are able to setup an automated way to test the feeds.

    Let’s consider this simple feed class below. I have created it for this article only so it is missing lots of features and basically all it does, is to read from the source and save it into the BLOB storage (both locations mounted inside the DBFS):

     

    from pyspark.sql import SparkSession 
    
    class TestDataFeed(object): 
      """ Dummy class for integration testing """ 
    
      def __init__(self, dbutils): 
        self._df = None 
        self._ctx = SparkSession.builder.getOrCreate() 
        self.dbutils = dbutils 
        self.source_dir = '/mnt/test_in/integration_testing/srce_dir' 
        self.destination_dir = '/mnt/test_out/integration_testing/dest_dir' 
    
      def read(self, path=None): 
        path = path or self.source_dir 
        spark = self._ctx 
        opts = dict(format='parquet', compression='snappy') 
    
        self._df = spark.read.load(path, **opts) 
    
        return self 
    
      def transform(self): 
        pass 
    
      def save2blob(self, path=None): 
        path = path or self.destination_dir 
        opts = dict(mode='overwrite', format='parquet', compression='snappy') 
    
        self._df.write.save(path, **opts) 
    
      def run(self): 
        self.read() 
        self.transform() 
        self.save2blob() 

    A simple test for this class would only read from the source directory and count the number of records fetched. The method will look like the below:

     

    def test_TestDataFeed(): 
        o = TestDataFeed(dbutils) 
        o.read() 
        o.transform() 
        y = o._df.count() 
        assert y>0, "TestDataFeed dummy pull" 

    In general tests can be more thorough and check the results of the transformations and if the save into the destination directory and/or ASDW table succeeded but for this case we will complete the test with only this check.

    Normally the test method would be placed in the module inside the tests directory aside of the module tested:

    •    module_tested.py
    —    class TestDataFeed
    •    tests/
    —    test_module_tested.py
    •   test_TestDataFeed()

    But to be able to use dbutils, we will need to modify it a little and we need to put it in a Databricks notebook so it can use dbutils.

    Instead of creating a notebook for every test function, the notebook will be generic and will be able to read the class name passed as a parameter. Based on the class name it will import it dynamically and execute the test function. It can look like the below example:

     

    import sys 
    sys.path.insert(0, '/dbfs/your_app_directory_test') 
    import datetime 
    import importlib 
    
    cls_name = getArgument("class_name") 
    pkg_name = getArgument("pkg_name") 
    
    try: 
      if not cls_name or not pkg_name: 
        raise ImportError('Class name or Package name cannot be empty!') 
    
      d=datetime.datetime.now() 
      ex = None 
    
      clss = None 
      o = None 
    
      module = importlib.import_module(pkg_name) 
      clss = getattr(module, cls_name) 
      o = clss() 
      o.read() 
      o.transform() 
      y = o._df.count() 
      assert y>0, cls_name + " dummy pull" 
    except Exception as e: 
      ex = e 
    finally: 
      dbutils.notebook.exit(ex) 

    To execute this notebook we will need a small util that has the ability to call the Databricks Jobs API, which can look like this:

     

    def run_notebook( 
        notebook_path, 
        cluster_id, 
        dbr_credentials, 
        base_url, 
        cls_name, 
        pkg_name, 
        ): 
    
        def_timeout_sec = 180 
        def_check_timeout_sec = 5 
    
        job_name = 'Integration dummy test' 
    
        base_parameters = {'class_name': cls_name, 'pkg_name': pkg_name} 
    
        job_submit_data = { 
            'run_name': job_name, 
            'existing_cluster_id': cluster_id, 
            'timeout_seconds': def_timeout_sec, 
            'notebook_task': {'notebook_path': notebook_path, 
                              'base_parameters': base_parameters}, 
            } 
    
        run_cmd = requests.post(base_url + 'api/2.0/jobs/runs/submit', 
                                data=json.dumps(job_submit_data), 
                                auth=dbr_credentials) 
    
        runjson = run_cmd.text 
        d = json.loads(runjson) 
        job_run_id = d['run_id'] 
    
        print 'Job submitted (Run_id: ' + str(job_run_id) + ')' 
    
        status_cmd = requests.get(base_url + 'api/2.0/jobs/runs/get?run_id=' 
                                   + str(job_run_id), 
                                  data=json.dumps(job_submit_data), 
                                  auth=dbr_credentials) 
        jobjson = status_cmd.text 
        j = json.loads(jobjson) 
        print 'Run page URL: ' + j['run_page_url'] 
    
        i = 0 
        job_timed_out = False 
        while not job_timed_out: 
            time.sleep(def_check_timeout_sec) 
            status_cmd = requests.get(base_url 
                                      + 'api/2.0/jobs/runs/get?run_id=' 
                                      + str(job_run_id), 
                                      data=json.dumps(job_submit_data), 
                                      auth=dbr_credentials) 
            jobjson = status_cmd.text 
            j = json.loads(jobjson) 
            job_current_state = j['state']['life_cycle_state'] 
            job_run_id = j['run_id'] 
            if job_current_state in ['TERMINATED', 'INTERNAL_ERROR', 
                                     'SKIPPED']: 
                print 'Job status: ' + job_current_state 
                break 
    
            # check for 30 minutes max 
    
            if i >= def_timeout_sec: 
                job_timed_out = True 
                status_cmd = requests.get(base_url 
                        + 'api/2.0/jobs/runs/get-output?run_id=' 
                        + str(job_run_id), 
                        data=json.dumps(job_submit_data), 
                        auth=dbr_credentials) 
                jobjson = status_cmd.text 
                j = json.loads(jobjson) 
                print 'Job status: TIMED OUT' 
                print 'Job takes more than expected. Please consult job log for more information: ' 
                print j['metadata']['run_page_url'] 
                break 
            i = i + 1 
    
        if not job_timed_out: 
            status_cmd = requests.get(base_url 
                                      + 'api/2.0/jobs/runs/get-output?run_id=' 
                                       + str(job_run_id), 
                                      data=json.dumps(job_submit_data), 
                                      auth=dbr_credentials) 
            jobjson = status_cmd.text 
            j = json.loads(jobjson) 
            job_output = j['notebook_output']['result'] 
    
            assert job_output == 'None', job_output 
            print 'Job SUCCEEDED'

    As this script can be executed locally without special token, I have replaced the test_TestDataFeed method to run run_notebook instead of the actual test. It will go to Databricks and execute for me the notebook that is able to run my tests.

    The exempt will look now like below. Keep in mind that I am passing the Databricks token as an env variable, as this is being kept in the GitHub secrets.

     

    BASE_URL = '<base_url_of_databricks>' 
    NOTEBOOK_PATH = '<notebook_path_of_your_test_notebook>' 
    DBR_TOKEN = os.environ.get('DBR_TOKEN') 
    DBR_CREDENTIALS = ('token', DBR_TOKEN) 
    
    def test_TestDataFeed(): 
        run_notebook( 
            cluster_id=CLUSTER_ID, 
            dbr_credentials=DBR_CREDENTIALS, 
            base_url=BASE_URL, 
            notebook_path=NOTEBOOK_PATH, 
            cls_name='TestDataFeed', 
            pkg_name='modules.data_feed_test', 
            ) 

    The workflow for GitHub action will then execute pytest to run the tests. The workflow should be completed in the following steps:

     

    name: Continous Deployment - Databricks 
    
    on: 
      push: 
        branches: [ develop, master ] 
    
    jobs: 
      build: 
    
        runs-on: ubuntu-latest 
    
        steps: 
        - uses: actions/checkout@v2 
    
        - name: Set up Python 3.7 
          uses: actions/setup-python@v2 
          with: 
            python-version: 3.7 
    
        - name: Install prerequisites 
          run: | 
            pip install -r your_app_directory/requirements.txt 
            cat > ~/.databrickscfg <<EOF 
            [DEFAULT] 
            host = https://eastus2.azuredatabricks.net 
            token = $([ ${GITHUB_REF##*/} == "master" ] && echo "${{ secrets.DB_TOKEN_PROD }}" || echo "${{ secrets.DB_TOKEN_DEV }}") 
            EOF 
    
        - name: Deploy to the Databricks test instance 
          run: | 
            dbfs rm -r dbfs:/your_app_directory_test 
            dbfs mkdirs dbfs:/your_app_directory_test/logs  
            find . \( ! -regex '.*/\..*' \) -type f -name "*.py" | cut -c 11- $a | xargs -i dbfs cp -r the_app/{} dbfs:/your_app_directory_test/{} --overwrite 
    
        - name: Test dummy load 
          env: 
            DBR_TOKEN_SECRET_DEV: ${{ secrets.DB_TOKEN_DEV }} 
          run: | 
            export DBR_TOKEN="$DBR_TOKEN_SECRET_DEV" 
            pytest --workers 5 the_app/modules/tests/test_module_tested.py 
    
        - name: Deploy to the Databricks destination instance 
          run: | 
            find . \( ! -regex '.*/\..*' \) -type f -name "*.py" | cut -c 11- $a | xargs -i dbfs cp -r the_app/{} dbfs:/your_app_directory/{} --overwrite 

    Once the test is completed, the output shows a summary on each test:

     

    Test dummy load 
    ======================== 32 passed in 135.50s (0:02:15) ======================== 
    Run pytest --workers 5 the_app/modules/tests/test_module_tested.py 
    ============================= test session starts ============================== 
    platform linux -- Python 3.7.8, pytest-6.0.1, py-1.9.0, pluggy-0.13.1 
    rootdir: /home/runner/work/<repo_name>/<repo_name> 
    plugins: parallel-0.1.0, testmon-1.0.3 
    collected 32 items 
    pytest-parallel: 5 workers (processes), 1 test per worker (thread) 
    ................................................................ 
    ======================== 32 passed in 135.50s (0:02:15) ======================== 

    If any of the checks failed, the your_app_directory remains untouched and the developer responsible for the merge needs to check on the exact issue in the pytest logs and fix accordingly. And the app still works!

    This is only one of the possible solutions for this problem, and as always the original idea should be only a stub for a broader discussion for any improvements to the procedures described. The tests described cover only the very basic tests that can ensure the application has no major issues like syntax errors, bad directories provided or inaccessible but can be a good base for more complex tests.

    Visit our blog for more in-depth Data Engineering articles:

     

    Data Engineering
    Share this post
    Close

    Send Feedback