Optimizing Apache Spark
Apache Spark is a powerful tool for data processing, which allows for orders of magnitude improvements in execution times compared to Hadoop’s MapReduce algorithms or single node processing. At times, however, be it due to some old habits programmers carry over from procedural processing systems or simply not knowing any better, Spark can struggle with even moderate amounts of data and simple joins. But before you start adding more nodes to your computing cluster or equip the ones that are already there with extra RAM (something not every developer can do), here are 5 aspects of Spark API you might want to consider, in order to make sure you are utilizing the framework the way it’s meant to be utilized:
Data at Rest
Managing the storage strategy of your input/output/temp data is a key to not only optimizing your own application, but also every other application that is using the same data for its processing. This strategy consists of – among others – picking the right storage system, file formats and partitioning of the data.
As a developer, you probably don’t have too much influence over the type of storage system that you have to service with your application. However, it’s important to have in mind how a particular storage performs under your specific scenario, i.e. if it’s throughput speed, cost of data modification etc.
File formats supported by Apache Spark
Since Spark supports a wide variety of data formats, it’s easy to commit to something that’s sub-optimal just because we’re somewhat familiar with the chosen type, and disregard the potential performance savings of others, often tailor-made for distributed computing.
You should always prefer binary, columnar data formats (i.e. Parquet) over alternatives (i.e. CSV). Not only they are faster to load, this will allow you to switch your focus from handling edge cases (escape characters, newlines etc.) to other, more productive tasks.
Splittable file types
It’s easy to overlook that some file formats can’t be easily split into chunks, which prevents you from taking advantage of the distributed nature of Spark framework. Files such as ZIP or JSON have to be read in their entirety before they can be interpreted.
Make sure the files you are loading to Spark are splittable in that sense.
Knowing how your tables will be most often read by end-users, (and by yourself) may allow you to intelligently split the raw table data into partition, split by the key you will query on. Partitioning, in certain scenarios, allows limiting the total amount of data loaded to your Spark cluster, which can have a dramatic impact on the speed of code execution.
Data skewness in Apache Spark
“Skewness” is a statistical term used to describe the asymmetry of the data in a given distribution. In Big Data, we call data “skewed” when it’s not uniformly distributed, and thus is hard to evenly parallelize. This particularly impacts the Join operations in your code.
An example might be a table where one specific key is assigned to an unproportionate number of rows. When performing a join on this key, Spark will send all records with the same join key to the same partition. Thus, our “popular” key will be forced to be processed within one task, oftentimes implying a major Shuffle overhead because of that.
You can monitor for these situations by paying attention to the Spark UI, and what goes on under the hood of execution. If for your job you’ll identify a task that takes much longer than others, with big values for Shuffle Read Size / Records, you can safely assume that one node is doing most of the heavy lifting during your processing. That’s most likely due to the Data Skew.
Ways to mitigate this:
- Use of Broadcast Joins instead of Sort Merge Joins
If you’re joining two tables, of which one is quite smaller than the other (small enough to fit into the executor’s memory) you might want to “hint” Spark into using Broadcast Join instead of Sort Merge Join. This can result in dramatic improvements of join times of skewed data. More on the join “hints”: https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-hints.html#join-hints
Note that Spark does plan Broadcast joins automatically, provided that one of the join sides is smaller than the broadcast join threshold (spark.sql.autoBroadcastJoinThreshold property).
- Key salting
The idea behind key salting is to introduce additional, artificial variability to your key value. This might be achieved by introducing uniformly assigned “buckets” to your main data table, exploding your right-side table with the same range of buckets (copying the values other than the bucket no.), and performing a join over key AND bucket. This will allow the single largest key value to be split over the number of partitions equal to the number of buckets, which will in turn distribute the data more evenly across your nodes.
Caching Data in Spark
The main advantage of Spark over Hadoop’s Map-Reduce approach (Spark’s ideological predecessor) is the ability to utilize RAM in its computation.
However, Spark also utilizes Lazy Evaluation for its execution plan, which means postponing obtaining any intermediate results until absolutely necessary. This also means that those intermediate results will not be saved to memory, since all Spark cares about is the final step of the plan. This works fine if you have a straight pipeline of transformations, where every step takes as it’s input the output of the previous one.
However, once an object is read more than once, this approach leads to it being recalculated every time on read. To avoid that, you can specify which Data Frames should be cached in memory at which time using built in cache() method.
Caching is a lazy operation in itself, so it will be performed only after the data is directly accessed.
Repartitioning with Apache Spark
It’s important to have in mind how your manipulation techniques impact the date and how it’s stored across nodes.
For instance, when filtering a very large dataset, and ending up with a comparatively much smaller one fulfilling your filter expression, you might end up with some partitions being empty on some worker nodes. This leads to data being unevenly distributed across the network, which decreases the level of parallelism that can be achieved during processing of your new, filtered data.
Repartition() and coalesce() are the built-in methods that were designed to force a shuffle, leading to a uniform distribution of data across worker nodes.
Repartition() recreates the partitions from scratch and does a full shuffle. Coalesce() merges partitions together, which results in less data movement compared to repartition() (parts of the data for the merged partitions are already in place), however might lead to uneven partitions, which in turn leads to data skew.
UDFs and Broadcast Variables
User-Defined Functions (UDFs) should be avoided from a performance point of view – they force the representation of data as objects in the JVM. This is a cost you pay for the ability to represent your transformations in a high-level programming language. Spark’s structured API is free of this limitation, and is generally the recommended way of performing data manipulations.
If you, however, want to use the UDFs in your code, consider utilizing Broadcast Variables. If you have a piece of data that will be used across multiple UDF calls, broadcasting it will mean that there’s a single read-only copy of the data on all the nodes, and it doesn’t have to be re-sent with each UDF call. Broadcasting can be useful outside of UDF use case as well, i.e. when using lookup tables.