Follow by Email

Sunday, 10 February 2019

Adaptive scheduling of Spark Job using YARN API

In last blog poorman spark monitoring i shared approach on how to figure out how long Spark Job is waiting for resource.

This post covers some more details on how to be proactive when Spark Job is stuck due to resource constraint.

Little recap of Yarn.
Apache Yarn is resource management and job scheduling framework for hadoop distributed processing framework.

MapReduce NextGen Architecture

Hadoop cluster are shared between teams and for proper utilization of cluster teams/projects are allocated some capacity of cluster.

One of the popular scheduling approach is "Capacity Scheduler" for multi tenant cluster and it is based on Queues.
Yarn allows to define min & max resource for Queue and it is hierarchical, it looks something like below

Capacity Scheduler


One of the issues that can happen in Capacity Scheduler is that your job is submitted to overloaded queue and it gets stuck in Accepted state for long time although other queues has some capacity which is just left unused.
Another common issue is Job started running but did not got all the resource(cores/memory) and will run forever because queue is overloaded.

Yarn gives REST API to query state of cluster/queues/application and that can be used to solve issues where resource is available in cluster but application is not using it :-)

Yarn API to build adaptive job submission
Yarn API comes very handy in solving both of the above issue, some of the strategy using yarn API.

 - Submit job to queue that has capacity.
This type of strategy will select queue at run-time and submit application to least loaded queue.

 - Move Job to queue that has capacity.
This type of strategy will monitor job status and if it is not moving or get stuck in "Accepted" state then will move it to queue that has some capacity.

Abstraction of Yarn API to get minimum details that will allow adaptive job submission.

Once we get all the metrics required for making decision then it becomes straight forward to submit/move the job.
Below code snippet try to move the job based on simple strategy of max wait time for Accepted status App.

Yarn exposes lots of metrics that can used to building adaptive system. You can refer to ResourceManagerRest for full set of API.

Word of caution that be fair when you are using this strategy, don't use whole cluster alone.
Image result for greedy


Code used in post is available @ yarn github project

Sunday, 3 February 2019

Golang control statements

We are going to explore Go lang control structure, this is not covering all the control statement but you can refer control-structures from effective go to get all the details.

Refer to index page for all the content written so far.

One thing that i like about control structure is that it is very easy to understand.
Focus on readability is clearly seen .


If statement
Go lang author managed to removed extra bracket in if statement, it looks something like 

if x > 10 {
fmt.Println("I am gt ", x)
} else {
fmt.Println("I am lt ", x)
}

Another variation that includes initialization and condition both  

if value := time.Now().Weekday(); value == time.Sunday {
fmt.Println("Yahoooo.. today is sunday")
} else {
fmt.Println("Lets get back to work. I hate", value)

}

Switch Statement
Switch case has few variations 

Simple one
value := 10
switch value {
case 10:
fmt.Println("Value is 10")
default:
fmt.Println("Some other value than 10")

}

With No expression
switch {
case value >= 10:
fmt.Println("Value is gt 10")
case value >= 20:
fmt.Println("Value is gt 20")

}

Switch with multiple condition in single case

specialValue := '@'
switch specialValue {
case '@', '!', '#':
fmt.Println("This is special value")
default:
fmt.Println("This is normal value")

}

Switch with type assertion 
Type assertion can be only done using switch case using variable.(type) expression.

var t interface{}
t = "James"
switch t.(type) {
case int:
fmt.Println("Int value", t)
case string:
fmt.Println("String value", t)

}

Loops
Has only one type of loop(while) and it can be used for all the purpose.

C/Java like
It has init, condition,post section.

value := 0
for counter := 0; counter < 10; counter++ {
value++
}

fmt.Println(value)

Just condition
value = 0
for value < 10 {
value++
}

fmt.Println(value)

Infinite (with no condition)

for {
value++
if value > 10 {
break
}

}

Smart loops
This is useful when dealing with arrays/map/channels

days := []string{"Sunday", "Monday"}
for index, value := range days {
fmt.Println("Index ", index, "Value ", value)

}

range keyword is very power full it works with all the collections types.
Another thing i like about golang is that compiler helps with lot of common error for e.g unused variable are compiler error, so below example is error because index is not used.

days := []string{"Sunday", "Monday"}
for index, value := range days {
fmt.Println("Value ", value)

}

It is possible to ignore the value by using "_" for eg

days := []string{"Sunday", "Monday"}
for _, value := range days {
fmt.Println("Value ", value)

}

Sample used in this post is available @ 003-statement github repo

Saturday, 26 January 2019

Poorman Spark monitoring

Spark exposes lots of metrics to get insights on what is happening inside Spark Application but some time you are looking for quick metrics on spark application.

In this post i will share example of some metrics that can be collected quickly using simple pattern.

How long my spark application is waiting for resource allocation ?
I always felt need of this metrics when running in shared cluster with limited capacity allocated to user.
This metrics is useful to know when spark job is stuck because cluster is busy.

Pattern is very simple start timer thread that monitor spark context creation and logs time at regular interval.

Code snippet for monitor code

Just start timer before SparkContext is created using below code

 monitoringExecutor.submit(newCallable(checkSparkContext))

How many records spark job/stage is processing ?

This is based on pattern that we need distributed counter to track how many records are processed by stage.
Spark has something called LongAccumulator that an be used for capturing metrics like this.

So we need block of code that is just logs value of accumulator and takes some action if it is not moving fast.
Code snippet for tracking records processed.

monitorRecordsProcessed is submitted for async execution and processData in map function will increment counter.

Note about accumulator that these are shared variables between driver & executors. If accumulator are written on executor side then there is chance of multiple/double writes due to retry of failed stages.
So just take that in account when dealing with Accumulator , they are very good as debugging tool or giving interactive feedback of processing but it can contain some noise when jobs/stages are failing.

Spark is using Accumulator to tracking stage internal metrics and all that is available on Spark UI.

How do we get access spark internal metrics ?

Now we are getting in Rich man monitoring. Lets look at example that gives access to internal accumulators and also exposes API to get all the metrics during job execution.

Below code logs all the accumulators at stage level, only rule is give name to accumulator so that it is available as internal spark metrics.

Once listener is defined then just add it to sparkContext

sparkSession.sparkContext.addSparkListener(new StageAccumulatorListener)

This listener will start showing all the accumulators, sample of logs

Record counter also come in this list because it was named value.

Spark gives API to get access to metrics during execution and it can be used to build proactive monitoring system.

All the code used in this post is available on poormonitor github



Sunday, 20 January 2019

Value of pass by value in GoLang

Now we are getting in some of the core concepts! Knowing this is very important to understand impact Go program will have on machine.

Everything is pass by value in Go, no matter what you pass. This also has what is you see is what you get.


Each go routine(i.e path of execution) get Stack, which is continuous memory. Go routine needs stack to do the all the allocation required. We will learn about go routine later but it is just like thread but much more lighter.


As go routine execute function it starts getting slice or portion of memory from stack that was allocated.


Lets try to understand with simple example


func main() {

counter := 0

counter++
fmt.Println("In main", counter)
inc(counter)
fmt.Println("After inc", counter)
}


Stack frame state when inc is executing

Stack Frame

Function can only read/write to its stack frame, that is the reason why function parameters are required.
With above example any change done by inc function is local to that stack frame and if it wants to share it to caller then it has to return it, so that value can be copied to caller frame.

Another interesting properties about stack frame is that it is reusable for eg after inc function completes execution that stack frame is available to another function.
So it is like increment pointer in stack to allocate memory to function and once that function completes then decrements the counter to mark memory as free.

Pass by value is required for safety and to reason about code which is missing in many language.

Lets explore how all this changes when pointer or address of variable is passed to function.

Lets try to understand how stack frame looks when below code is executed

 func main() {

counter := 0

fmt.Println("Before pointer inc ", counter)
 incByPointer(&counter)
 fmt.Println("After pointer inc ", counter)
}

Stack frame using pointer

  
In above example parameter to function is still passed by value but this time it is of address type.
Caller knows that it has received address(&variable) of variable and to change the value , it has to use different instruction (*variable) 

An asterisk (*) operator allow program to change the variable that is outside of its own stack frame, this variable can be in heap or caller function stack.

Having clear distinction when value is passed vs address of value is very power full thing as it tells that which function are doing read vs write.
Anytime you see pointer (&) , it is very clear that some mutation is happening in function.

No magical modification is possible.

Having clear distinction has couple of advantage 
 - Compiler can do escape analysis to determine what gets allocated to stack vs heap. This keeps GC happy because stack allocation are cheap and heap has GC overhead
 - When to copy value vs share value. This is very useful thing for large values, you don't want to copy 1gb of buffer to function.

Go lang gives options to developer to choose trade off rather than giving no control.

Lets look at one more example on how allocation works


func allocateOnStack() stock {

google := stock{symbol: "GOOG", price: 1109}
return google
}

func allocateOnHeap() *stock {

google := stock{symbol: "GOOG", price: 1109}
return &google
}

Both of the above function is creating stock value but look at return type, one returns value(allocateOnStack) and other one (allocateOnHeap) returns address.
Compiler looks the return type and make a decision on what goes on stack vs heap.
So you decided what you want to throw at GC vs keep it happy.

You might have question on Stack like how big is stack ?
Each Go routine starts with 2 MB stack size, it is small and good enough to hold lots of functions call.
For most of the cases 2 MB is good but if program continues to put memory pressure on Stack then it grows to adjust the need only for specific Go routine.
Stack growth has allocation & copy cost, it is just like allocate new array and copy the value from previous array.

One nice thing about Stack memory is that it is monitored by GC and it will reduce the size of Stack if utilization of stack is around 25%.  


Go gives power of compact memory layout using Struct and efficient memory allocation using pass by value.

All the samples used in this blog is available @ pointers github repo

Sunday, 13 January 2019

How Go lang struct works

This is 3rd post of my Go lang experiment, if you are want to read about earlier post then go to

is-it-worth-learning-golang
what-are-golang-types

Struct are cool types, it allows to create user defined type.

Struct basic
Struct can be declared like this

type person struct {
   firstName string
   lastName string
}

this declares struct with 2 fields.

Struct variables can be declared like this
var p1 person

var construct will initialized p1 to Zero value, so both the string fields are set to "".

DOT (.) construct is used to access field.

How to define struct variables.
Couple of ways by which variable can be created.

var p1 person                                      // Zero value
var p2 = person{}                                  //Zero value
p3 := person{firstName: "James", lastName: "Bond"} //Proper initialization
p4 := person{firstName: "James"}                   //Partial initialization

p5 := new(person) // Using new operator , this returns pointer
p5.firstName = "James"
p5.lastName = "Bond"

Struct comparison
Same type of struct can be compared using "==" operator.

p1 := person{firstName: "James", lastName: "Bond"}
p2 := person{firstName: "James", lastName: "Bond"}


if p1 == p2 {
fmt.Println("Same person found!!!!", p1)
} else {
fmt.Println("They are different", p1, p2)
}

this shows power of pure value, no equals/hashcode type of things are required to compare, language has first class support to compare by value.

Struct conversion
Go lang does not have casting, it is supports conversion and it is applicable to any types not just struct.

Casting keep source object reference and put target object struct/layout on top of it, so in casting any changes done to source object after casting is visible to target object.
This is good for reducing memory overhead but for safety this can cause big problem because values can change magically from source object.

On other end conversion copies source value, so after conversion both source and target have no link, changing one does not impact other one. This is good for type safety and easy to reason about code.

Lets look into some conversion example of struct.

type person struct {
   firstName string
   lastName string
}

type anotherperson struct {
firstName string
lastName  string
}

Both of the above are same in structure but these two can't be assigned to each other without conversion.

p1 := person{firstName: "James", lastName: "Bond"}
anotherp1 := anotherperson{firstName: "James", lastName: "Bond"}


p1  = anotherp1 //This is compile time error
p1 = person(anotherp1)//This is allowed

Compiler is very smart to figure out that these two types are compatible and conversion is allowed.
Now if go and make change in otherperson struct like drop the field/ new field/change the order then it becomes not compatible and compiler stops this!

When it does allow conversion then it allocate new memory for target variable and copies the value.

For eg
p1 = person(anotherp1)
anotherp1.lastName = "Lee" // Will have not effect on p1


How struct are allocated

Since it is composite type and understanding memory layout of struct is very useful in knowing what type of overhead it comes up.

Current processor will do some cool things for fast & safe read/write.
Memory allocation will be aligned to word size of underlying platform ( 32 bit or 64 bit) and it will be also aligned based on size of the type for eg 4 byte value will be aligned to 4 byte address.

Alignment is very important for speed and correctness.
Lets take example to understand this, in 64 bit platform word size is 64bit or 8 byte, so it will take 1 instruction to read 1 word.

Memory Layout
Value shown in red is 2 byte and if value shown in red is allocated in 2 words(i.e at the boundary of word) then it is going to take multiple operation to read/write value and for write some kind of synchronization might be required.

Since value is only 2 byte, so it can easily fit in single word so compiler will try to allocate this in single word

Single word allocation
Above allocation is optimized for read/write. Struct allocation works on same principle.

Now lets take example of struct and see how what will be memory layout

type layouttest struct {
b  byte
v  int32
f  float64
v2 int32
}

layout of "layoutouttest" will look something like below

[ 1 X X 1 1 1 1 X ][1 1 1 1 X X X X][1 1 1 1 1 1 1 1][1 1 1 1 X X X X]

X - is for padding.
It took 4 word to place this struct and to get the alignment by data type padding is added.
If we calculate size of struct ( 1 + 4 + 4 + 8 = 17) then it should fit value in 3 word( 8*3 = 24) but it took 4 words( 8 * 4 = 32). It might look like 8 bytes are wasted.

Go gives full control to developer about memory layout, much more compact struct can be created to get to 3 word allocation.

type compactlyouttest struct {
f  float64
v  int32
v2 int32
b  byte
}

Above struct has reordered field in descending order by size it takes and this helps in getting to below memory layout

[ 1 1 1 1 1 1 1 1 ][1 1 1 1 1 1 1 1][1 X X X X X X X]

In this arrangement less space is wasted in padding and you might be tempted to use compact representation.

You should should not do this for couple of reason
 - This breaks the readability because related fields are moved all over the place.

 - Memory might not be issue, so it could be just over optimization.

 - Processor are very smart, values are read in cacheline not in word, so CPU will read multiple words and you will never see any slowness in read. You can read about how cache line works in cpu-cache-access-pattern post.

 - Over optimization can result in false sharing issue, read concurrent-counter-with-no-false-sharing to see impact of false sharing in multi threaded code.



So profile application before doing any optimization.

Go has built in packages for getting memory alignment details & other static information of types.

Below code gives lot of details about memory layout

unsafe & reflect package gives lot of internal details and looks like idea has come from java

Code used in this blog is available @ 001-struct github

Thursday, 10 January 2019

what are Golang Types


Go is strongly typed language and type is life. Language has rich types and good support for extension of type. Type provides integrity.

In this post i will share some of primitive types and how Go handles them.

Everything is 0 or 1 in computer and only these 2 values are used to represent any values we want.
Arrangement of 0 or 1 tells what is the value.


Take a example of byte value at some memory location

Binary



What is it ? You need type information .

If type is int then value is 10, if type of enum then some other value.

Type information tell us about value and size for eg if type is Boolean then it tells it is single byte value.

Information about types supported by Go can be found at Lang Spec Types  page.

How to declare variable ?

var variablename type
variablename := value // Short declaration

Both of above declare variable but the way it is initialized is very different.

Var creates and initialized with ZERO value of its type,  Zero value is very special it makes code bug free and clean! No null checks.

Zero value is based on Type so for integer type it is zero, boolean it is false , string it is empty.

Go has some type like int that gets size based on underlying architecture, for eg it will be 4 bytes(i.e 32 bit arch) or 8 bytes( 64 bit arc). This is also good example of mechanical sympathy to underlying platform.


Examples of variable declaration



Alias for built in type

This is very powerful feature and it allow built in types to be extended by adding behavior .
Example of type alias

In above example RichInt has toBinary function that returns binary value.  I will share later how to extend types when we explore methods of types.

Casting Vs Conversion
Casting is magic, it allows to convert one type to another implicitly. How many times in java you lost value when long/int casting or double/float.
Go has concept of conversion, you explicitly convert from x to y type and pay the cost of extra memory at the cost of safety.

Go lang spec has some good examples.

Some real custom types
Go lang has support for Struct type, it is pure value type , no noise of behavior attached to it.
It gives control of memory layout, you can choose really compact memory layout to avoid padding or to add padding if required.

Struct can be declared like below

type person struct {
firstName string
lastName  string
        age int
}

Once struct is defined then we can create value of struct type.
It is value not object just remember that!

value can be created using below code

var p1 person

above code create value and initialized it with zero value, string is initialized to empty value and int to 0.
No null check is required when processing p1 because it is initialized to ZERO value

Short declaration can be used to specified non zero or other value

p2 := person{firstName: "James", lastName: "Bond", age: 35}

Zero value and convenient way to creating value kills the need of having constructor or destructor in Go.
You can now start seeing power of value. No overhead of constructor/destructor/ or complex life cycle.

I know you will have question on what about special init code or clean up code that is required ?

behavior are handled very differently, we will go over that in later post.

Struct can be nested also and Zero value or short declaration works like magic!

We will create additional struct

type address struct {
address1 string
address2 string
city     string
}

type contact struct {
landLine int
mobile   int
}

type person struct {
firstName      string
lastName       string
age            int
add            address
contactDetails contact
}

p3 := person{firstName: "James", lastName: "Bond", age: 35,
add:            address{address1: "30 Wellington Square", address2: "Street 81"},
contactDetails: contact{mobile: 11119999}}

Code used in blog is available @ letsgo github repo