Saturday, 26 January 2019

Poorman Spark monitoring

Spark exposes lots of metrics to get insights on what is happening inside Spark Application but some time you are looking for quick metrics on spark application.

In this post i will share example of some metrics that can be collected quickly using simple pattern.

How long my spark application is waiting for resource allocation ?
I always felt need of this metrics when running in shared cluster with limited capacity allocated to user.
This metrics is useful to know when spark job is stuck because cluster is busy.

Pattern is very simple start timer thread that monitor spark context creation and logs time at regular interval.

Code snippet for monitor code

Just start timer before SparkContext is created using below code

 monitoringExecutor.submit(newCallable(checkSparkContext))

How many records spark job/stage is processing ?

This is based on pattern that we need distributed counter to track how many records are processed by stage.
Spark has something called LongAccumulator that an be used for capturing metrics like this.

So we need block of code that is just logs value of accumulator and takes some action if it is not moving fast.
Code snippet for tracking records processed.

monitorRecordsProcessed is submitted for async execution and processData in map function will increment counter.

Note about accumulator that these are shared variables between driver & executors. If accumulator are written on executor side then there is chance of multiple/double writes due to retry of failed stages.
So just take that in account when dealing with Accumulator , they are very good as debugging tool or giving interactive feedback of processing but it can contain some noise when jobs/stages are failing.

Spark is using Accumulator to tracking stage internal metrics and all that is available on Spark UI.

How do we get access spark internal metrics ?

Now we are getting in Rich man monitoring. Lets look at example that gives access to internal accumulators and also exposes API to get all the metrics during job execution.

Below code logs all the accumulators at stage level, only rule is give name to accumulator so that it is available as internal spark metrics.

Once listener is defined then just add it to sparkContext

sparkSession.sparkContext.addSparkListener(new StageAccumulatorListener)

This listener will start showing all the accumulators, sample of logs

Record counter also come in this list because it was named value.

Spark gives API to get access to metrics during execution and it can be used to build proactive monitoring system.

All the code used in this post is available on poormonitor github



No comments:

Post a Comment