• Databricks
  • Share this post

    First things first

    In data science and python world there is a very well known package pandas.

    But… what is pandas?

    Pandas provide essential data structures like series, dataframes and panels which help in manipulating data sets and time series. Introduced massive set of operations makes this library very powerfull tool. It is free to use and an open source library, making it one of the most widely used data science libraries in the world. Find out how our data engineering services can help your business

     

    Pandas architecture

     

    • pandas/core – data structures
    • pandas/src – basic functionality written in C/Cython.
    • pandas/io – tools to input and output, files, data, etc
    • pandas/tools – codes and algorithms for various functions and operations (eg.: merge, join, concatenation, etc.)
    • pandas/sparse – carries the sparse versions, eg. the versions made to handle missing values of various data structures
    • pandas/stats – functions related to statistics, like linear regression
    • pandas/util – testing tools and various other utilities to debug the library.
    • pandas/rpy – interface which helps to connect to R. It is called R2Py

     

    Key features

     

    • data manipulation
    • handling missing values
    • file format support
    • data cleaning
    • visualize
    • python support

     


     

    Simple code examples

     

    Writing excel files – generating test sample

     

    import numpy
    import random
    import pandas as pd
    
    writer = pd.ExcelWriter('pandas_simple.xlsx', engine='openpyxl')
    
    sample_nb = int(1e1)
    df = pd.DataFrame(
        {'age': np.random.random_integers(0, 100, sample_nb),
         'sex': np.random.choice(['male', 'female'], sample_nb)})
    
    df.to_excel(writer, sheet_name='sheet1')
    
    writer.save()

     

    Reading excel files

     

    import pandas as pd
    df = pd.read_excel('pandas_simple.xlsx', sheet_name='sheet1')
    
    df

    Result:

     

        Unnamed: 0  age sex
    0   0           43  female
    1   1           57  male
    2   2           9   female
    3   3           56  female
    4   4           75  male
    5   5           6   female
    6   6           49  male
    7   7           15  male
    8   8           19  female
    9   9           63  male

     

    df.sex.unique()

    Result:

     

    array(['female', 'male'], dtype=object)

     

    df.mean()

    Result:

     

    Unnamed: 0     4.5
    age           39.2
    dtype: float64

     


     

    Koalas

    Main intention of this project is to provide data scientists using pandas with a way to scale their existing big data workloads by running them on Apache SparkTM without significantly modifying their code.
    The Koalas project allows to use pandas API interface with big data, by implementing the pandas DataFrame API on top of Apache Spark.
    Pandas is the de facto standard (single-node) DataFrame implementation in Python, while Spark is the standard for big data processing.
    It means that, in theory, POC work written in pandas should be very easy to migrate into spark environment.
    You can be immediately productive with Spark, with no learning curve, if you are already familiar with pandas.

     

    • pandas – tests, smaller datasets
    • koalas – big distributed datasets

    Warning: koalas implements ~80% of all pandas APIs [June 2020]

    koalas is advised to be used with latest Apache Spark 3.0+:

     

    • Python 3.8
    • Spark accessor
    • new type hints
    • better in-place operations

    Internally, Koalas DataFrames are built on PySpark DataFrame API. Koalas translates pandas APIs into the logical plan of Spark SQL whish is afterwards optimized and executed by the sophisticated and robust Spark SQL engine. Koalas also uses lazy evaluation semantics for maximizing the performance. To have compliant pandas DataFrame structure and its rich APIs that require an implicit ordering, Koalas DataFrames have the internal metadata to represent pandas-equivalent indices and column labels mapped to the columns in PySpark DataFrame.

     


     

    Lets play with databricks

    To make things harder, we will use excel files 🙂 Someone wise said some day that real big data always involes excel at some point. The best would be to have it at the very end of workflow, but life easily verifies it.

    Koalas is a high level library we can install it only on the driver side. Nevertheless we still need have dependencies installed on our nodes:

     

    • PySpark >= 2.4 – driver + nodes
    • pyarrow >= 0.10 – driver + nodes
    • pandas >= 0.23 – driver
    • matplotlib >= 3.0.0 – driver

    Installation and imports:

     

    dbutils.library.installPyPI("koalas")
    dbutils.library.installPyPI("openpyxl")
    dbutils.library.restartPython()
    
    import numpy as np
    import pandas as pd
    import databricks.koalas as ks

    Now we generate some sample data considered as a quite heavy to be processed on a single machine. This is very unusual source, thus the code does not leverage Spark’s framework power:

     

    import numpy
    import random
    import pandas as pd
    
    def sample_dfs(files_nb):
        for i in range(files_nb):
            sample_nb = int(1e5)
            yield ks.DataFrame(
                {'age': np.random.randint(0, 100, sample_nb),
                'sex': np.random.choice(['male', 'female'], sample_nb)})
    
    for idx, df in enumerate(sample_dfs(100)):
        writer = pd.ExcelWriter('/dbfs/pandas_simple_%03d.xlsx' % idx, engine='openpyxl')
        df.to_excel(writer, sheet_name='sheet1')
        writer.save()

    Such unpleasant source can be loaded right away!

     

    kdf = ks.read_excel('file:/dbfs/pandas_simple_*', engine='openpyxl')
    kdf.to_spark().count()

    Result:

     

    10000000

     

    kdf.count()

    Result:

     

    Unnamed: 0    10000
    age           10000
    sex           10000
    dtype: int64

    To prove that koalas leverages Spark we can use:

     

    kdf.explain(extended=True)

    Result:

     

    == Parsed Logical Plan ==
    Project [__index_level_0__#32185L, Unnamed: 0#32186L, age#32187L, sex#32188]
    +- Project [__index_level_0__#32185L, Unnamed: 0#32186L, age#32187L, sex#32188, monotonically_increasing_id() AS __natural_order__#32197L]
       +- MapInPandas <lambda>(content#32178), [__index_level_0__#32185L, Unnamed: 0#32186L, age#32187L, sex#32188]
          +- Project [content#32178]
             +- Relation[path#32175,modificationTime#32176,length#32177L,content#32178] binaryFile
    
    == Analyzed Logical Plan ==
    __index_level_0__: bigint, Unnamed: 0: bigint, age: bigint, sex: string
    Project [__index_level_0__#32185L, Unnamed: 0#32186L, age#32187L, sex#32188]
    +- Project [__index_level_0__#32185L, Unnamed: 0#32186L, age#32187L, sex#32188, monotonically_increasing_id() AS __natural_order__#32197L]
       +- MapInPandas <lambda>(content#32178), [__index_level_0__#32185L, Unnamed: 0#32186L, age#32187L, sex#32188]
          +- Project [content#32178]
             +- Relation[path#32175,modificationTime#32176,length#32177L,content#32178] binaryFile
    
    == Optimized Logical Plan ==
    MapInPandas <lambda>(content#32178), [__index_level_0__#32185L, Unnamed: 0#32186L, age#32187L, sex#32188]
    +- Project [content#32178]
       +- Relation[path#32175,modificationTime#32176,length#32177L,content#32178] binaryFile
    
    == Physical Plan ==
    MapInPandas <lambda>(content#32178), [__index_level_0__#32185L, Unnamed: 0#32186L, age#32187L, sex#32188]
    +- FileScan binaryFile [content#32178] Batched: false, DataFilters: [], Format: org.apache.spark.sql.execution.datasources.binaryfile.BinaryFileFormat@128c5b73, Location: InMemoryFileIndex[file:/dbfs/pandas_simple_003.xlsx, file:/dbfs/pandas_simple_009.xlsx, file:/dbf..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<content:binary>

    When viewing DAG for this dataframe we can observe new kind of operations within first stage:

    FileScan binaryFile -> MapInPandas

     

    Configuration

    Koalas comes with set of configuration options which can even toggle processing type between pandas and PySpark depending in thresholds. Please refer to the docs here. Particular options can have huge influence on the final processing time.

    Options can be set globally or for a part of code by using context managers:

     

    import databricks.koalas as ks
    from databricks.koalas.config import get_option, set_option, reset_option
    
    # example
    opt_value = get_option('compute.max_rows')
    print('check', opt_value)
    set_option('compute.max_rows', 2000)
    opt_value = get_option('compute.max_rows')
    print('after change', opt_value)
    reset_option('compute.max_rows')
    print('after reset', opt_value)
    
    # or
    ks.options.compute.max_rows = 2000
    print(ks.options.compute.max_rows)
    
    # koalas options used only in following context
    with ks.option_context('display.max_rows', 10, 'compute.max_rows', 5):
        print(ks.get_option('display.max_rows'))
        print(ks.get_option('compute.max_rows'))

     

    Interoperability between pandas and Spark

    There are couple of ways to use spark functionality on koalas dataframe:

     

    import databricks.koalas as ks
    
    sdf = spark.createDataFrame(
      [(1, 23, 'Marysia'),
       (2, 15, 'Kasia'),
       (3, 17, 'Wojtek')],
      schema=['index', 'age', 'name'])
    sdf.show()
    
    # +-----+---+-------+
    # |index|age|   name|
    # +-----+---+-------+
    # |    1| 23|Marysia|
    # |    2| 15|  Kasia|
    # |    3| 17| Wojtek|
    # +-----+---+-------+
    
    # convert spark-df into koalas-df
    kdf = sdf.to_koalas()
    
    # convert koalas-df into spark-df
    sdf = kdf.to_spark()
    
    # spark accessor
    from pyspark.sql import functions as F
    kdf['age_sqrt'] = kdf.age.spark.transform(lambda col: F.sqrt(col))
    
    kdf
    
    #   index   age     name        age_sqrt
    # 0 1       23      Marysia     4.795832
    # 1 2       15      Kasia       3.872983
    # 2 3       17      Wojtek      4.123106
    
    # apply
    kdf.spark.apply(
        lambda sdf: sdf.selectExpr('age + 10 as age_next_10_years'))
    
    #   age_next_10_years
    # 0 33
    # 1 25
    # 2 27

    Note that apply from spark accessor is used to handle whole dataframe within lambda function. transform is used only for a particular column.

     

    Comment

    Koalas is something still new. Nevertheless it can be a very useful library especially when working POC project based on pandas requires fast migration into full blown big data environement as Spark to use much bigger scale.

    Visit our blog for more in-depth Data Engineering articles:

    Data Engineering
    Share this post
    Close

    Send Feedback