Search This Blog

Sunday, 18 November 2018

Insights from Spark UI

As continuation of anatomy-of-apache-spark-job post i will share how you can use Spark UI for tuning job

I will continue with same example that was used in earlier post, new spark application will do below things

 - Read new york city parking ticket
 - Aggregation by "Plate ID" and calculate offence dates
 - Save result

DAG for this code looks like this


























This is multi stage job, so some data shuffle is required, for this sample shuffle write is 564mb and output is 461 MB.

Lets see what we can do to reduce this ?
lets take top down approach from "Stage2". First thing that comes to mind is explore compression.

Current code

New Code


New code is only enabling gzip on write, lets see what we see on spark UI

Save with Gzip






With just write encoder write went down by 70%. Now it 135Mb and it speed up the job.

Lets see what else is possible before we dive in more internals tuning

Final output looks some like below

1RA32   1       05/07/2014
92062KA 2       07/29/2013,07/18/2013
GJJ1410 3       12/07/2016,03/04/2017,04/25/2015
FJZ3486 3       10/21/2013,01/25/2014
FDV7798 7       03/09/2014,01/14/2014,07/25/2014,11/21/2015,12/04/2015,01/16/2015

Offence date is stored in raw format, it is possible to apply little encoding on this to get some more speed.

Java 8 added LocalDate to make date manipulation easy and this class comes with some handy functions, one of that is toEpocDay.
This function convert date to day from 1970 and so it means that in 4 bytes(Int) we can store upto 5K years, this seems big saving as compared to current format which is taking 10 bytes.

Code snippet with epocDay

Spark UI after this change. I have also done one more change to use KryoSerializer






This is huge improvement , Shuffle write changed from 564Mb to 409MB ( 27% better) and output from 134Mb to 124 Mb( 8% better)

Now lets go to another section on Spark UI that shows logs from executor side.
GC logs for above run shows below thing

2018-10-28T17:13:35.332+0800: 130.281: [GC (Allocation Failure) [PSYoungGen: 306176K->20608K(327168K)] 456383K->170815K(992768K), 0.0222440 secs] [Times: user=0.09 sys=0.00, real=0.03 secs]
2018-10-28T17:13:35.941+0800: 130.889: [GC (Allocation Failure) [PSYoungGen: 326784K->19408K(327168K)] 476991K->186180K(992768K), 0.0152300 secs] [Times: user=0.09 sys=0.00, real=0.02 secs]
2018-10-28T17:13:36.367+0800: 131.315: [GC (GCLocker Initiated GC) [PSYoungGen: 324560K->18592K(324096K)] 491332K->199904K(989696K), 0.0130390 secs] [Times: user=0.11 sys=0.00, real=0.01 secs]
2018-10-28T17:13:36.771+0800: 131.720: [GC (GCLocker Initiated GC) [PSYoungGen: 323744K->18304K(326656K)] 505058K->215325K(992256K), 0.0152620 secs] [Times: user=0.09 sys=0.00, real=0.02 secs]
2018-10-28T17:13:37.201+0800: 132.149: [GC (Allocation Failure) [PSYoungGen: 323456K->20864K(326656K)] 520481K->233017K(992256K), 0.0199460 secs] [Times: user=0.12 sys=0.00, real=0.02 secs]
2018-10-28T17:13:37.672+0800: 132.620: [GC (Allocation Failure) [PSYoungGen: 326016K->18864K(327168K)] 538169K->245181K(992768K), 0.0237590 secs] [Times: user=0.17 sys=0.00, real=0.03 secs]
2018-10-28T17:13:38.057+0800: 133.005: [GC (GCLocker Initiated GC) [PSYoungGen: 324016K->17728K(327168K)] 550336K->259147K(992768K), 0.0153710 secs] [Times: user=0.09 sys=0.00, real=0.01 secs]
2018-10-28T17:13:38.478+0800: 133.426: [GC (Allocation Failure) [PSYoungGen: 322880K->18656K(326144K)] 564301K->277690K(991744K), 0.0156780 secs] [Times: user=0.00 sys=0.00, real=0.01 secs]
2018-10-28T17:13:38.951+0800: 133.899: [GC (Allocation Failure) [PSYoungGen: 323808K->21472K(326656K)] 582842K->294338K(992256K), 0.0157690 secs] [Times: user=0.09 sys=0.00, real=0.02 secs]
2018-10-28T17:13:39.384+0800: 134.332: [GC (Allocation Failure) [PSYoungGen: 326624K->18912K(317440K)] 599490K->305610K(983040K), 0.0126610 secs] [Times: user=0.11 sys=0.00, real=0.02 secs]
2018-10-28T17:13:39.993+0800: 134.941: [GC (Allocation Failure) [PSYoungGen: 313824K->17664K(322048K)] 600522K->320486K(987648K), 0.0111380 secs] [Times: user=0.00 sys=0.00, real=0.02 secs]

Lets focus on one the line

2018-10-28T17:13:39.993+0800: 134.941: [GC (Allocation Failure) [PSYoungGen: 313824K->17664K(322048K)] 600522K->320486K(987648K), 0.0111380 secs] [Times: user=0.00 sys=0.00, real=0.02 secs]

Heap before minor GC was 600MB and after that 320MB and total heap size is 987 MB.
Executor is allocated 2gb and this Spark application is not using all the memory, we can put more load on executor by send more task or bigger task.

I will reduce input partition from 270 to 100


With 270 input partition 


With 100 input partition













100 input partition looks better with around 10+% less data to shuffle.

Other tricks
Now i will share some of things that will make big difference in GC!

Code before optimization

Code after optimization

New code is doing optimized merge of set, it is adding small set to the big one and also introduced Case class.
Another optimization is in save function where it is using mapPartitions to reduce object allocation by using StringBuffer.

I used http://gceasy.io to get some GC stats.

Before code change


After code change

New code is producing less garbage for eg. 
 Total GC 126 gb vs 122 gb ( around 4% better)
 Max GC time 720ms vs 520 ms ( around 25% better)

Optimization looks promising.

All the code used in this blog is available on github repo sparkperformance

Stay tuned up for more on this.

Saturday, 10 November 2018

SQL is Stream


Stream API for any language looks like writing SQL.

Map is Select Columns
filter is Where
count is Count(1)
limit is LIMIT X
collect is get all result on client side

So it is very easy to map all the functions of Streams API to some part of SQL.

Object relation mapping framework like (hibernate, mybatis, JPA, Toplink,ActiveRecord etc) give good abstraction over SQL but adds lot of overhead and also does not give much control on how SQL is build and many times you have write native SQL.

Image result for i hate hibernate

ORM never made writing SQL easy and if you don't trust me then quick refresh to how code looks .

Sometime i feel that engineer are writing more annotation than real algorithm!

To implement any feature we have to keep switching between SQL API and non sql API, this makes code hard to maintain and many times it is not optimal also.

This problem can be solved by having library that is based on Streams API and it can generate SQL then we don't have to switch, it becomes unified programming experience.

With such library testing will become easy as source of stream can be changed on need basis like in real env it is database and it test it is in memory data structure.

In this post i will share toy example of how library will look look like.

Code Snippet

Stream<StocksPrice> rows = stocksTable.stream();
long count = rows
                .filter(Where.GT("volume", 1467200))
                .filter(Where.GT("open_price", 1108d))
                .count();

Above code generates
Select Count(1) From stocks_price where volume > 1467200 AND open_price > 1108

Look at another example with Limit

stocksTable.stream()
                .filter(Where.GT("volume", 1467200))
                .filter(Where.GT("open_price", 1108d))
                .limit(2)
                .collect(Collectors.toList());

Select stock_symbol,open_price,high_price,trade_date FROM stocks_price WHERE volume > 1467200 AND open_price > 1108.0 LIMIT 2

These API can also use code generation to give compile time safety like checking column names, type etc.

Benefits 

Streams API comes will give some other benefits like
 - Parallel Execution
 - Join between database data and non db data can be easily done using map.
- Allows to use pure streaming approach and this is good when dealing with huge data.
- Opens up options of generating Native optimized query because multiple phase of pipeline can be merged.

This programming model is not new , it is very common in distributed computing framework like Spark, Kafka, Flink etc.

Spark dataset is based on this approach where it generates optimized query like pushing filters to storage, reducing reads by looking at partitions, selective column read etc.

Conclusion

Database driver must give stream based API and this will help in reducing dependency on ORM framework.
This is very powerful programming model and opens up lots of options.

Code used in this post is available @ streams github repo.

Friday, 26 October 2018

Broken promise of Agile

AgileManifesto was written 17 years back(i.e 2001) and is it able to bring the change to industry ?



I would say yes but not is the way authors wanted.

Many consulting company made millions of $ but as software engineer i did not see the change.

How did Agile broke promise

I will put key things that authors wanted so we have some context to discuss about this

Individuals and interactions over processes and tools
Working software over comprehensive documentation

Customer collaboration over contract negotiation

Responding to change over following a plan



This looks so good :-)

Lets start with education industry

"Attend Agile training for 2 days and you are certified Scrum master or Agile Developer"

What did people learn after these workshop ? Standup, Planning Poker, retrospective , backlog grooming , JIRA and many more things.

One thing that is missing is "Agile mindset" , no one can teach or learn this in 2 days so it is big joke that you team went to expensive training and they are Agile Team.

Lets go over main items so see how industry see main goals of Agile

  • Individuals and interactions over processes and tools
Almost all the team got this wrong, they thought we need more tools JIRA was born, industry created big ceremony(backklog groming, standup, reto etc)
I think have too many things in JIRA tell only one thing that software quality is bad or team is too slow to delivery features or product team is creating shopping list of features that they never wanted.

Now it has gone to extend that you need multiple product owner for just grooming session and scrum masters for running retrospective meeting and to add more project manager to track story points/burn down etc.

Team is spending more time on Jira board rather than talking to team. We killed individual& interactions .

How many tools or process we added ? it is count less .

  • Working software over comprehensive documentation
People got confused with this and they said i need working software + document. Now nothing got done properly and some team will come and say we are AgileWater it mean we build software fast but also use documents that is required for waterfall.
Developer endup doing more non productive work.

Working Software was also take as team is allowed to release crap in production because we are Agile.
What was meant by working software was MVP not 20% or 40% developed item, feature is done or not done there is nothing like 50% when that feature is released to production.

Team is put in so much pressure to release that they end up taking shortcut and to address that "Tech Debt" drive is required.

When software team goes to product to get fund/approval to fix all the Tech Debt then team comes and say why did you develop & deploy crap piece of software.
  • Customer collaboration over contract negotiation
This is classic customer used Agile for blackmailing or question team credential to put last minute change. This gives them licences to change the requirement any time.
Not to miss commitment that is taken inform of Story Points and if team miss that then it is expected that they need to put extra hours.

Dev team is not different they use this as excuse to build  bad quality of software.

  • Responding to change over following a plan
Agile never said that build feature without tests, architecture or no plan.
Planning is must and good architecture also but it should be Just enough to move in right direction and it is continuous architecture without end state.

Team took this like no design or architecture to response to change.

Conclusion

One of my favorite questions about sprint is

"How long is your sprint ?"
 2/3 weeks ?

"Why 2 weeks , why not 1 month or 1 year ? who decided this ?"
It is written in Agile manifesto or my manager told this or i don't know other teams are doing this .

"How long ticket sits in backlog before making to production ?"
I don't know . Check with my TPM or if some one knows they will come and tell 1 year or 3 year"


Agile was about giving team freedom to choose sprint size based on when they are ready for feedback or customer is ready to give feedback
Sprint can 1 week or 6 months but key point is you should get the feedback after that and adjust.
If customer are not in the feedback loop then go back to waterfall .

Another thing was about Software Craftmanship.
Agile project has so many non developer like Project manager, Project manager, Scrum Master, Agile coach that they don't value Craftmanship , so we developer start new conference on this and get more disconnected.

Agile was written by developer for developer but now we are out and this place is taking by non-developers .

In Agile Conference ask the question "how many developer? "
You will see less hands :-( because they are in other craftman conference.

Agile project are about project management, dates, money , time.
Manager makes sure that plan is made by them and followed by team.

Today all project are agile but they still fail, over budget, never on time.

Any process like Agile has one hidden feedback that is called "Dissatisfaction" and you need respond to that change to become better.

Our Software industry has three inevitable things
 - Degradation
 - Dysfunction
 - Expiry

Degradation ->  Maintaining,Transformation
Dysfunction -> Innovation & Challenge
Expiry  -> Creating & Starting over

Degradation ,Dysfunction & Expiry applies to people, project, team,process , strategy , organization.

Agile is no different identify the phase and create version 2 of process or find new one that works.


Tuesday, 23 October 2018

Simple Testing Can Prevent Most Critical Failures

Error handling is one of the hardest and ignored part of software development and if system is distributed then this becomes even harder.

Nice paper is written on Simple Testing Can Prevent Most Critical Failures topic.
Every developer should read this paper. I will try to summarized key take away from this paper but will suggest to read the paper to get more details about it.

Image result for testing in production














Distributed system outage is common and some of the recent example are 

Youtube was down on Oct,2018 for around 1+ hour
Amazon was down during Prime day on July,2018
Google services like Map,Gmail,Youtube were down numerous time in 2018
Facebook was also down apart from many data leak issues they are facing.


This paper talks about catastrophic failure that happened in distributed system like Cassandra, Hbase , HDFS, Redis, Map Reduce.

As per paper most of the errors are due to 2 reason

 - Failure happens due to complex sequence of events
 - Catastrophic error are due to incorrect handling
 - I will include 3rd one on "ignoring of design pressure" which i wrote in design-pressure-on-engineering-team post

Example from HBase outage

1 - Load balancer Transfer region R from Slave A to Slave
2 - Slave B open region R
3 - Master delete current Zookeeper region R after it is owned by Slave B
4 - Slave B dies
5 - Region R is assigned to Slave C & Slave C open the region
6 - Master tries to delete Slave B znode on Zookeeper and because Slave b is down and whole cluster goes down due to wrong error handling code.

In above example sequence of event matters to reproduce issue.

HDFS failure when block is not replicated.


In this example also sequence of event and when new data node starts it exposes bug of system.

Paper has many more examples.

Root cause of error 

92% of the catastrophic error happens due to incorrect error handling.
What this means is that error was deducted but error handling code was not good, does this sound like lots of project you have worked on !

1 - Error are ignored
This is reason of 25% of the failure, i think number will be high in many live system.

eg of such error
catch(RebootException e) {
   log.info("Reboot occurred....")
}

Yes this harmless looking log statement is ignoring exception and is very common anti pattern of error handling.

2 - Overcatch exception
This is also very common like having generic catch block and bringing down the whole system

catch(Throwable e) {
     cluster.abort()
}

3 - TODO/FIXME in comments
Yes real distributed system in production also has lots of TODO/FIXME in critical section of code.

Some other example of error handling

} catch (IOException e) {
 // will never happen
 }

} catch (NoTransitionException e) {
 /* Why this can happen? Ask God not me. */
 }


try { tableLock.release(); }
catch (IOException e) {
 LOG("Can't release lock”, e);


4 - Feature development is prioritized
 I think all the software engineers will agree to it. This is also called Tech Debt and i can't think of better example than Knight Capital bankruptcy which was due to config & experimental code.

Conclusion

All the errors are complex to reproduce but better unit test will definitely catch these, this also shows that unit/integration test done in many system is not testing scenario like service going down and coming back again and how it impacts system.

Based on above example it will look like all error are due to java checked exception but it is not different in other system like C/C++ which does not have checked but everything is unchecked , it is developer responsibility to check for it at various places.

On side note language with no type system like Python makes it very easy to write code that will break at runtime and if you are really unlucky then error handling code will have some type error and it will get tested in production.

Also almost all product will have some static code tool(findbugs) integration but these tools does not give more importance to such error handling anti pattern.


Link to issues mention in paper
HDFS
MapReduce
HBase
Redis
Cassandra

Please share about more anti pattern you have seen in production system.
Till then Happy unit testing.



Friday, 19 October 2018

When microservices becomes darkservices

Micro services is great and many company comes and talk about it on how it is used for scaling team, product etc

Microservices has dark side also and as a programmer you should about it before going on ride.
In this post i will share some of the myths/dark side about micro services

  • We needs lots of micro services 
Before you create any new micro services think about distributed computing because most of the micro services are remote process. First define what "micro" means in problem context it could be lines of code , features or deployment etc

  • Naming micro services will be easy
Computer science has only 2 complex problem and one of them is "naming", very soon you will run out of options when you have 100s of them.

  • Non functional requirement can be done later
Suddenly non function requirement like ( latency, throughput, security, reliability etc) becomes very important from day one.
  • Polyglot programming/persistence or something poly...
Software engineer likes to try latest cutting edge tool so they get carried away by this myth that we can use any language or any framework or any persistence. 
Think about skills and maintenance overhead required for poly.... thing that is added, if you have more than 2/3 things then it is not going to fit in head and you have to be on pager duty.
  • Monitoring is easy
This is one of the most ignored fact about micro services and come as afterthought.
For simple investigation you have to login to many machines , looks in logs , make sure you get the timing right on server etc.

Without proper monitoring tools you can't do this, you need ELK or DataDog type of things.
  • Read and writes are easy 
This thing also get ignored now you are in distributed transaction world and it is not good place to be in and to handle this you need eventual consistent system or non available system.

  • Everything is secure
Now one service is talking to another services using API, so you need good auth system to make sure your system is secure. If you work in financial system then you will be spending more time in answering security related questions.
  • My service will be always up
That will never happen no matter how good programmer or infra you have, service will go down and now you are in Middleware land(Kafka,ActiveMq,ZeroMQ etc) to handle this , so that request can be queued while service was not available.

  • I can add break point to debug it
This is just not possible because now you are in remote process and don't know how many micro services are involved in single request.
  • Testing will be same
Testing is never same as monolithic, you need better automated test to get out of testing hell.
  • No code duplication
As you add more services, code sharing becomes hard because any change in some common code required good testing and to avoid that many team start code duplication.
  • JSON over HTTP
This is one of the biggest myth that all micro services must have Json over Http and it is user facing. 
This has resulted in explosion of REST based API for every micro services and is the reason of why many system are slow because they used text based protocol with no type information.

One thing you want to take away from anti pattern of micro services is that rethink that do you really need Json/REST for every service or you can use other optimized protocol and encoding.
  • Versioning is my grandfather job
Since most of the micro services are remote process , so you have to come with  request/response spec and have to manage version for backward compatibility. 
  •   Team communication remains same.
This is like hidden elephant in room with more services more team communication is required to keep them posted about what is current version, where it is running , what is broken etc.
You can have more silos because no one knows about whole system 

  • Your product is of google/facebook/netflix scale
This is like buy lottery ticket that you are never going to win.


If you can't write decent modular monolithic then don't try micro services because it is all about getting correct coupling and cohesion. Modules should be loosely coupled and high cohesive.

No free lunch with micro services and if you get it wrong then you will be paying premium price :-)


Image result for happy micro services


Thursday, 18 October 2018

Setup SSL in Jetty

Have you faced issues when you have to quickly enable SSL and you got stuck with it :-(
You are not alone, i will share my pain and some learning.

I will share steps to enable SSL on jetty.

Warning: Use below instruction only for dev setup and for production contact your security expert !

  • Install jetty on your server

  • Setup some env variable for convenience like

              export jetty_home=.../somejetty
              export jetty_base = .../your_application_install_location

              It is recommended to keep jetty base out side of jetty installation otherwise you will have classpath nightmare 

  • Execute below command to create initial setup for SSL

             java -jar $jetty_home/start.jar --add-to-startd=ssl jetty.base=$jetty_base

        Once you run above command you will see something like below on console.



  •  Add below line  ${jetty.base}/start.d/ssl.ini 
        --module=https

    Check ssl port(jetty.ssl.port) and change it accordingly  

  • Add below line in  ${jetty.base}/start.ini
         jetty.ssl.port=port
         Use same port as ssl.ini file.         

  • Start the server
        java -jar $jetty_home/start.jar jetty.base=$jetty_base


     You are done :-) Jetty starts on ssl .

Magic Questions

     - Which certificate is used by jetty ? 
    That is the magic, jetty ships with certificate that is already imported in keystore that jetty is using.
     Jetty looks for keystore in $jetty_base/etc/keystore location.


    - What is password of keystore

      Key store password is $jetty_base/start.d/ssl.ini , but it is encrypted. You can use below command to get the password.
     java -cp jetty-util-9.2.14.v20151106.jar org.eclipse.jetty.util.security.Password "OBF:1vny1zlo1x8e1vnw1vn61x8g1zlu1vn4" 

     it is "storepwd"

 - How to see what is in key store ? run the below command and enter password
   keytool --list  -v -keystore keystore
     
    If jetty gives some error like password is wrong or tampered then copy the keystore from $jetty_home/etc/keystore to  $jetty_base/etc


   It takes only 5 minutes to perform all the steps but only if you know otherwise it is day long frustration. Enjoy development with jetty.

Sunday, 30 September 2018

Anatomy of Apache Spark Job

Apache Spark is general purpose large scale data processing framework. Understanding how spark executes jobs is very important for getting most of it.

Little recap of Spark evaluation paradigm

Spark is using lazy evaluation paradigm in which Spark application does not anything till driver calls "Action".

Lazy eval is key to all the runtime/compile time optimization spark can do with it.
Lazy eval is not new concept it is used in functional programming for decades, data base also uses this for creating logical & physical execution plan.

 Neural network framework like tensorflow is also based on lazy eval, first it builds compute graph and then execute its.


Spark application is made up of jobs , stages & tasks. Jobs & task are executed in parallel by spark but stage inside job are sequential.
Knowing what executes parallel and sequence is very important when you want to tune spark jobs.

Stage are executed in order, so job with many stage will choke on it and also previous stage will feed next stage and it comes with some overhead that involves writing stage output to persistent source(i.e disk, hdfs, s3 etc) and reading it again. This is also called wide transformation/Shuffle dependency.

Job with single stage will be very fast but you can't build any useful application using single stage.

Examples
Lets see some code examples to understand this better.




Spark DAG


This DAG view from spark ui makes it very clear that how Spark sees/execute application.
Above code is creating 3 stage and every stage boundary has some overhead like (Shuffle read/write).
Steps in single stage for eg stage 1 has filter & map merged.

This view also has "Tasks", that is the smallest unit of work that is executed. This application has 2 task per stage.


How spark application is executed
Lets deep dive into how it is executed. Spark application needs 3 component to execute

  • Driver - This submit request to master and coordinate all the tasks. 
  • Cluster Manager - Launches spark executor based on request from driver.
  • Executor  - Executes job and send result back to driver.




























2 important component involved in spark application is Driver & Executor, spark job can fail when any of these component are under stress it could be memory/CPU/network/disk.

In next section i will share some of my experience with issues on executor side.
Executor Issues
Each executor needs 2 parameter Cores & Memory.
Cores decided how many task that executor can process and memory is shared between all the cores/task in that executors.

Each spark job has different type of requirement ,so it is anti-pattern to use single config for all the Spark applications.

Issue 1 - Too big task for executor
Executor will fail to process the task or run slow if task is too big to fit in memory.
Few things to look for when this is the issue
  •   Long pause on driver log file( i.e log file not moving)
  •  GC time is too long, it can be verified from "executors" page on spark UI


  • Retry of Stage



  • Executor Log full of "spilling in-memory map" message
2018-09-30 03:30:06 INFO  ExternalSorter:54 - Thread 44 spilling in-memory map of 371.0 MB to disk (6 times so far)
2018-09-30 03:30:24 INFO  ExternalSorter:54 - Thread 44 spilling in-memory map of 379.5 MB to disk (7 times so far)
2018-09-30 03:30:38 INFO  ExternalSorter:54 - Thread 44 spilling in-memory map of 373.8 MB to disk (8 times so far)
2018-09-30 03:30:58 INFO  ExternalSorter:54 - Thread 44 spilling in-memory map of 384.0 MB to disk (9 times so far)
2018-09-30 03:31:17 INFO  ExternalSorter:54 - Thread 44 spilling in-memory map of 382.7 MB to disk (10 times so far)
2018-09-30 03:31:38 INFO  ExternalSorter:54 - Thread 44 spilling in-memory map of 371.0 MB to disk (11 times so far)
2018-09-30 03:31:58 INFO  ExternalSorter:54 - Thread 44 spilling in-memory map of 371.0 MB to disk (12 times so far)

  • Executor log with OOM error
2018-09-30 03:34:35 ERROR Executor:91 - Exception in task 0.0 in stage 3.0 (TID 273)
java.lang.OutOfMemoryError: GC overhead limit exceeded
 at java.util.Arrays.copyOfRange(Arrays.java:3664)
 at java.lang.String.<init>(String.java:207)
 at java.lang.StringBuilder.toString(StringBuilder.java:407)
 at sun.reflect.MethodAccessorGenerator.generateName(MethodAccessorGenerator.java:770)
 at sun.reflect.MethodAccessorGenerator.generate(MethodAccessorGenerator.java:286)
 at sun.reflect.MethodAccessorGenerator.generateSerializationConstructor(MethodAccessorGenerator.java:112)



How to solve this ?

One option that comes quickly is to increase memory on executor side and it works but there will be limit on how much memory you can add to executor side, so very soon you will run out of this option because most of the cluster are shared and it has limit on max memory that can be allocated to executor. 

Next better option is to make individual task small and it is all in your control. This has tradeoff of more shuffle but it is still better than previous one.

Spark UI snapshot for bad run & good run.

Bad Run

Good Run

Second one is with adjusting partition size. Bad run has all the indicator that it needs tuning on partition size.

Issue 2 - Too many cores in executor

This is also also very common problem because we want to overload executor by throwing too many task.
Lets see how to spot if this is issue

  • Time spent on GC on executor side
  • Executor log with message - spilling in-memory map
  • Peak Execution Memory on executor during task execution. This is only available when job is running not on history server.

I will put 2 snapshot from sparkUI

PartitionExecutorCoresMemory
Run 1100242g
Run 1100222g

4 Cores/2 Executor

2 Cores/2 Executor
8 Cores(4*2 Exe) one is busy with GC overhead, with 4 cores(2 * 2 Executor) everything cuts down by half, it is more efficient by using just 4 cores.

If you see pattern like these then reduce executor core and increase no of executors to make spark job faster.

Issue 3 - Yarn memory overhead

This is my favorite and below error confirms that Spark application is having this issue
"ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 
XXX GB of XXX GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead"

When ever this error comes most of the developer goes on stack overflow and increase "spark.yarn.executor.memoryOverhead" parameter value.
This is ok option for short term will fail again soon and you will keep on increasing it and finally run out of option.

I think increasing "spark.yarn.executor.memoryOverhead" as anti pattern because whatever memory is specified is added to total memory of executors..
This error means executor is overloaded and best option is try other solution that i mention above.

Spark has so many tuning parameter that some time it looks like siting in plan cockpit.

All the code used in this blog is available @ sparkperformance github repo