• 22 September 2020
  • Zbyszek Pomianowski
  • Databricks

First things first

In data science and python world there is a very well known package pandas.

But… what is pandas?

Pandas provide essential data structures like series, dataframes and panels which help in manipulating data sets and time series. Introduced massive set of operations makes this library very powerfull tool. It is free to use and an open source library, making it one of the most widely used data science libraries in the world.

Pandas architecture

  • pandas/core – data structures
  • pandas/src – basic functionality written in C/Cython.
  • pandas/io – tools to input and output, files, data, etc
  • pandas/tools – codes and algorithms for various functions and operations (eg.: merge, join, concatenation, etc.)
  • pandas/sparse – carries the sparse versions, eg. the versions made to handle missing values of various data structures
  • pandas/stats – functions related to statistics, like linear regression
  • pandas/util – testing tools and various other utilities to debug the library.
  • pandas/rpy – interface which helps to connect to R. It is called R2Py

Key features

  • data manipulation
  • handling missing values
  • file format support
  • data cleaning
  • visualize
  • python support

Simple code examples

Writing excel files – generating test sample

import numpy
import random
import pandas as pd

writer = pd.ExcelWriter('pandas_simple.xlsx', engine='openpyxl')

sample_nb = int(1e1)
df = pd.DataFrame(
    {'age': np.random.random_integers(0, 100, sample_nb),
     'sex': np.random.choice(['male', 'female'], sample_nb)})

df.to_excel(writer, sheet_name='sheet1')

writer.save()

Reading excel files

import pandas as pd
df = pd.read_excel('pandas_simple.xlsx', sheet_name='sheet1')

df

Result:

    Unnamed: 0  age sex
0   0           43  female
1   1           57  male
2   2           9   female
3   3           56  female
4   4           75  male
5   5           6   female
6   6           49  male
7   7           15  male
8   8           19  female
9   9           63  male
df.sex.unique()

Result:

array(['female', 'male'], dtype=object)
df.mean()

Result:

Unnamed: 0     4.5
age           39.2
dtype: float64

Koalas

Main intention of this project is to provide data scientists using pandas with a way to scale their existing big data workloads by running them on Apache SparkTM without significantly modifying their code.
The Koalas project allows to use pandas API interface with big data, by implementing the pandas DataFrame API on top of Apache Spark.
Pandas is the de facto standard (single-node) DataFrame implementation in Python, while Spark is the standard for big data processing.
It means that, in theory, POC work written in pandas should be very easy to migrate into spark environment.
You can be immediately productive with Spark, with no learning curve, if you are already familiar with pandas.

  • pandas – tests, smaller datasets
  • koalas – big distributed datasets

Warning: koalas implements ~80% of all pandas APIs [June 2020]

koalas is advised to be used with latest Apache Spark 3.0+:

  • Python 3.8
  • Spark accessor
  • new type hints
  • better in-place operations

Internally, Koalas DataFrames are built on PySpark DataFrame API. Koalas translates pandas APIs into the logical plan of Spark SQL whish is afterwards optimized and executed by the sophisticated and robust Spark SQL engine. Koalas also uses lazy evaluation semantics for maximizing the performance. To have compliant pandas DataFrame structure and its rich APIs that require an implicit ordering, Koalas DataFrames have the internal metadata to represent pandas-equivalent indices and column labels mapped to the columns in PySpark DataFrame.


Lets play with databricks

To make things harder, we will use excel files 🙂 Someone wise said some day that real big data always involes excel at some point. The best would be to have it at the very end of workflow, but life easily verifies it.

Koalas is a high level library we can install it only on the driver side. Nevertheless we still need have dependencies installed on our nodes:

  • PySpark >= 2.4 – driver + nodes
  • pyarrow >= 0.10 – driver + nodes
  • pandas >= 0.23 – driver
  • matplotlib >= 3.0.0 – driver

Installation and imports:

dbutils.library.installPyPI("koalas")
dbutils.library.installPyPI("openpyxl")
dbutils.library.restartPython()

import numpy as np
import pandas as pd
import databricks.koalas as ks

Now we generate some sample data considered as a quite heavy to be processed on a single machine. This is very unusual source, thus the code does not leverage Spark’s framework power:

import numpy
import random
import pandas as pd

def sample_dfs(files_nb):
    for i in range(files_nb):
        sample_nb = int(1e5)
        yield ks.DataFrame(
            {'age': np.random.randint(0, 100, sample_nb),
            'sex': np.random.choice(['male', 'female'], sample_nb)})

for idx, df in enumerate(sample_dfs(100)):
    writer = pd.ExcelWriter('/dbfs/pandas_simple_%03d.xlsx' % idx, engine='openpyxl')
    df.to_excel(writer, sheet_name='sheet1')
    writer.save()

Such unpleasant source can be loaded right away!

kdf = ks.read_excel('file:/dbfs/pandas_simple_*', engine='openpyxl')
kdf.to_spark().count()

Result:

10000000
kdf.count()

Result:

Unnamed: 0    10000
age           10000
sex           10000
dtype: int64

To prove that koalas leverages Spark we can use:

kdf.explain(extended=True)

Result:

== Parsed Logical Plan ==
Project [__index_level_0__#32185L, Unnamed: 0#32186L, age#32187L, sex#32188]
+- Project [__index_level_0__#32185L, Unnamed: 0#32186L, age#32187L, sex#32188, monotonically_increasing_id() AS __natural_order__#32197L]
   +- MapInPandas <lambda>(content#32178), [__index_level_0__#32185L, Unnamed: 0#32186L, age#32187L, sex#32188]
      +- Project [content#32178]
         +- Relation[path#32175,modificationTime#32176,length#32177L,content#32178] binaryFile

== Analyzed Logical Plan ==
__index_level_0__: bigint, Unnamed: 0: bigint, age: bigint, sex: string
Project [__index_level_0__#32185L, Unnamed: 0#32186L, age#32187L, sex#32188]
+- Project [__index_level_0__#32185L, Unnamed: 0#32186L, age#32187L, sex#32188, monotonically_increasing_id() AS __natural_order__#32197L]
   +- MapInPandas <lambda>(content#32178), [__index_level_0__#32185L, Unnamed: 0#32186L, age#32187L, sex#32188]
      +- Project [content#32178]
         +- Relation[path#32175,modificationTime#32176,length#32177L,content#32178] binaryFile

== Optimized Logical Plan ==
MapInPandas <lambda>(content#32178), [__index_level_0__#32185L, Unnamed: 0#32186L, age#32187L, sex#32188]
+- Project [content#32178]
   +- Relation[path#32175,modificationTime#32176,length#32177L,content#32178] binaryFile

== Physical Plan ==
MapInPandas <lambda>(content#32178), [__index_level_0__#32185L, Unnamed: 0#32186L, age#32187L, sex#32188]
+- FileScan binaryFile [content#32178] Batched: false, DataFilters: [], Format: org.apache.spark.sql.execution.datasources.binaryfile.BinaryFileFormat@128c5b73, Location: InMemoryFileIndex[file:/dbfs/pandas_simple_003.xlsx, file:/dbfs/pandas_simple_009.xlsx, file:/dbf..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<content:binary>

When viewing DAG for this dataframe we can observe new kind of operations within first stage:

FileScan binaryFile -> MapInPandas

Configuration

Koalas comes with set of configuration options which can even toggle processing type between pandas and PySpark depending in thresholds. Please refer to the docs here. Particular options can have huge influence on the final processing time.

Options can be set globally or for a part of code by using context managers:

import databricks.koalas as ks
from databricks.koalas.config import get_option, set_option, reset_option

# example
opt_value = get_option('compute.max_rows')
print('check', opt_value)
set_option('compute.max_rows', 2000)
opt_value = get_option('compute.max_rows')
print('after change', opt_value)
reset_option('compute.max_rows')
print('after reset', opt_value)

# or
ks.options.compute.max_rows = 2000
print(ks.options.compute.max_rows)

# koalas options used only in following context
with ks.option_context('display.max_rows', 10, 'compute.max_rows', 5):
    print(ks.get_option('display.max_rows'))
    print(ks.get_option('compute.max_rows'))

Interoperability between pandas and Spark

There are couple of ways to use spark functionality on koalas dataframe:

import databricks.koalas as ks

sdf = spark.createDataFrame(
  [(1, 23, 'Marysia'),
   (2, 15, 'Kasia'),
   (3, 17, 'Wojtek')],
  schema=['index', 'age', 'name'])
sdf.show()

# +-----+---+-------+
# |index|age|   name|
# +-----+---+-------+
# |    1| 23|Marysia|
# |    2| 15|  Kasia|
# |    3| 17| Wojtek|
# +-----+---+-------+

# convert spark-df into koalas-df
kdf = sdf.to_koalas()

# convert koalas-df into spark-df
sdf = kdf.to_spark()

# spark accessor
from pyspark.sql import functions as F
kdf['age_sqrt'] = kdf.age.spark.transform(lambda col: F.sqrt(col))

kdf

#   index   age     name        age_sqrt
# 0 1       23      Marysia     4.795832
# 1 2       15      Kasia       3.872983
# 2 3       17      Wojtek      4.123106

# apply
kdf.spark.apply(
    lambda sdf: sdf.selectExpr('age + 10 as age_next_10_years'))

#   age_next_10_years
# 0 33
# 1 25
# 2 27

Note that apply from spark accessor is used to handle whole dataframe within lambda function. transform is used only for a particular column.

Comment

Koalas is something still new. Nevertheless it can be a very useful library especially when working POC project based on pandas requires fast migration into full blown big data environement as Spark to use much bigger scale.