Thursday 22 November 2018

Spark Run local design pattern

Many spark application has now become legacy application and it becomes very hard to enhance, test & run locally.

Spark has very good testing support but still many spark application is not testable.
I will share one common error that you see when try to run some old spark application.

When you see such error you have 2 option
 - Forget it that it can't be run locally and continue work with this frustration.
 - Fix it to run locally and show example of The Boy Scout Rule to your team

I will show very simple pattern that will save from such frustration.

This code is using isLocalSpark function to decided how to handle local mode and you can use any technique to make that decision like have env parameter or command line parameter or any thing else.

Once you know it is run local then create spark context based on it.

Now this code can run locally or via Spark-Submit also.

Happy Spark Testing.
Image result for i love testing

Code used in this blog is available @ runlocal repo

Sunday 18 November 2018

Insights from Spark UI

As continuation of anatomy-of-apache-spark-job post i will share how you can use Spark UI for tuning job

I will continue with same example that was used in earlier post, new spark application will do below things

 - Read new york city parking ticket
 - Aggregation by "Plate ID" and calculate offence dates
 - Save result

DAG for this code looks like this

This is multi stage job, so some data shuffle is required, for this sample shuffle write is 564mb and output is 461 MB.

Lets see what we can do to reduce this ?
lets take top down approach from "Stage2". First thing that comes to mind is explore compression.

Current code

New Code

New code is only enabling gzip on write, lets see what we see on spark UI

Save with Gzip

With just write encoder write went down by 70%. Now it 135Mb and it speed up the job.

Lets see what else is possible before we dive in more internals tuning

Final output looks some like below

1RA32   1       05/07/2014
92062KA 2       07/29/2013,07/18/2013
GJJ1410 3       12/07/2016,03/04/2017,04/25/2015
FJZ3486 3       10/21/2013,01/25/2014
FDV7798 7       03/09/2014,01/14/2014,07/25/2014,11/21/2015,12/04/2015,01/16/2015

Offence date is stored in raw format, it is possible to apply little encoding on this to get some more speed.

Java 8 added LocalDate to make date manipulation easy and this class comes with some handy functions, one of that is toEpocDay.
This function convert date to day from 1970 and so it means that in 4 bytes(Int) we can store upto 5K years, this seems big saving as compared to current format which is taking 10 bytes.

Code snippet with epocDay

Spark UI after this change. I have also done one more change to use KryoSerializer

This is huge improvement , Shuffle write changed from 564Mb to 409MB ( 27% better) and output from 134Mb to 124 Mb( 8% better)

Now lets go to another section on Spark UI that shows logs from executor side.
GC logs for above run shows below thing

2018-10-28T17:13:35.332+0800: 130.281: [GC (Allocation Failure) [PSYoungGen: 306176K->20608K(327168K)] 456383K->170815K(992768K), 0.0222440 secs] [Times: user=0.09 sys=0.00, real=0.03 secs]
2018-10-28T17:13:35.941+0800: 130.889: [GC (Allocation Failure) [PSYoungGen: 326784K->19408K(327168K)] 476991K->186180K(992768K), 0.0152300 secs] [Times: user=0.09 sys=0.00, real=0.02 secs]
2018-10-28T17:13:36.367+0800: 131.315: [GC (GCLocker Initiated GC) [PSYoungGen: 324560K->18592K(324096K)] 491332K->199904K(989696K), 0.0130390 secs] [Times: user=0.11 sys=0.00, real=0.01 secs]
2018-10-28T17:13:36.771+0800: 131.720: [GC (GCLocker Initiated GC) [PSYoungGen: 323744K->18304K(326656K)] 505058K->215325K(992256K), 0.0152620 secs] [Times: user=0.09 sys=0.00, real=0.02 secs]
2018-10-28T17:13:37.201+0800: 132.149: [GC (Allocation Failure) [PSYoungGen: 323456K->20864K(326656K)] 520481K->233017K(992256K), 0.0199460 secs] [Times: user=0.12 sys=0.00, real=0.02 secs]
2018-10-28T17:13:37.672+0800: 132.620: [GC (Allocation Failure) [PSYoungGen: 326016K->18864K(327168K)] 538169K->245181K(992768K), 0.0237590 secs] [Times: user=0.17 sys=0.00, real=0.03 secs]
2018-10-28T17:13:38.057+0800: 133.005: [GC (GCLocker Initiated GC) [PSYoungGen: 324016K->17728K(327168K)] 550336K->259147K(992768K), 0.0153710 secs] [Times: user=0.09 sys=0.00, real=0.01 secs]
2018-10-28T17:13:38.478+0800: 133.426: [GC (Allocation Failure) [PSYoungGen: 322880K->18656K(326144K)] 564301K->277690K(991744K), 0.0156780 secs] [Times: user=0.00 sys=0.00, real=0.01 secs]
2018-10-28T17:13:38.951+0800: 133.899: [GC (Allocation Failure) [PSYoungGen: 323808K->21472K(326656K)] 582842K->294338K(992256K), 0.0157690 secs] [Times: user=0.09 sys=0.00, real=0.02 secs]
2018-10-28T17:13:39.384+0800: 134.332: [GC (Allocation Failure) [PSYoungGen: 326624K->18912K(317440K)] 599490K->305610K(983040K), 0.0126610 secs] [Times: user=0.11 sys=0.00, real=0.02 secs]
2018-10-28T17:13:39.993+0800: 134.941: [GC (Allocation Failure) [PSYoungGen: 313824K->17664K(322048K)] 600522K->320486K(987648K), 0.0111380 secs] [Times: user=0.00 sys=0.00, real=0.02 secs]

Lets focus on one the line

2018-10-28T17:13:39.993+0800: 134.941: [GC (Allocation Failure) [PSYoungGen: 313824K->17664K(322048K)] 600522K->320486K(987648K), 0.0111380 secs] [Times: user=0.00 sys=0.00, real=0.02 secs]

Heap before minor GC was 600MB and after that 320MB and total heap size is 987 MB.
Executor is allocated 2gb and this Spark application is not using all the memory, we can put more load on executor by send more task or bigger task.

I will reduce input partition from 270 to 100

With 270 input partition 

With 100 input partition

100 input partition looks better with around 10+% less data to shuffle.

Other tricks
Now i will share some of things that will make big difference in GC!

Code before optimization

Code after optimization

New code is doing optimized merge of set, it is adding small set to the big one and also introduced Case class.
Another optimization is in save function where it is using mapPartitions to reduce object allocation by using StringBuffer.

I used to get some GC stats.

Before code change

After code change

New code is producing less garbage for eg. 
 Total GC 126 gb vs 122 gb ( around 4% better)
 Max GC time 720ms vs 520 ms ( around 25% better)

Optimization looks promising.

All the code used in this blog is available on github repo sparkperformance

Stay tuned up for more on this.

Saturday 10 November 2018

SQL is Stream

Stream API for any language looks like writing SQL.

Map is Select Columns
filter is Where
count is Count(1)
limit is LIMIT X
collect is get all result on client side

So it is very easy to map all the functions of Streams API to some part of SQL.

Object relation mapping framework like (hibernate, mybatis, JPA, Toplink,ActiveRecord etc) give good abstraction over SQL but adds lot of overhead and also does not give much control on how SQL is build and many times you have write native SQL.

Image result for i hate hibernate

ORM never made writing SQL easy and if you don't trust me then quick refresh to how code looks .

Sometime i feel that engineer are writing more annotation than real algorithm!

To implement any feature we have to keep switching between SQL API and non sql API, this makes code hard to maintain and many times it is not optimal also.

This problem can be solved by having library that is based on Streams API and it can generate SQL then we don't have to switch, it becomes unified programming experience.

With such library testing will become easy as source of stream can be changed on need basis like in real env it is database and it test it is in memory data structure.

In this post i will share toy example of how library will look look like.

Code Snippet

Stream<StocksPrice> rows =;
long count = rows
                .filter(Where.GT("volume", 1467200))
                .filter(Where.GT("open_price", 1108d))

Above code generates
Select Count(1) From stocks_price where volume > 1467200 AND open_price > 1108

Look at another example with Limit
                .filter(Where.GT("volume", 1467200))
                .filter(Where.GT("open_price", 1108d))

Select stock_symbol,open_price,high_price,trade_date FROM stocks_price WHERE volume > 1467200 AND open_price > 1108.0 LIMIT 2

These API can also use code generation to give compile time safety like checking column names, type etc.


Streams API comes will give some other benefits like
 - Parallel Execution
 - Join between database data and non db data can be easily done using map.
- Allows to use pure streaming approach and this is good when dealing with huge data.
- Opens up options of generating Native optimized query because multiple phase of pipeline can be merged.

This programming model is not new , it is very common in distributed computing framework like Spark, Kafka, Flink etc.

Spark dataset is based on this approach where it generates optimized query like pushing filters to storage, reducing reads by looking at partitions, selective column read etc.


Database driver must give stream based API and this will help in reducing dependency on ORM framework.
This is very powerful programming model and opens up lots of options.

Code used in this post is available @ streams github repo.