Tuesday, 20 October 2020

Histogram is not Instagram

Histogram is very popular technique to represent distribution of numerical data. This representation can be used for various thing like measuring latency of request, cardinality of data , grouping data in bins , frequency density of values.







In this post i will share some of the common use case with histogram.

Measuring latency

Assume that we want to measure response time of API call. First thing that comes to mind is stopwatch! Code snippet will look something like this. 

StopWatch stopWatch = new StopWatch();
stopWatch.start();

//Some time consuming task

stopWatch.stop();

System.out.println(stopWatch.toString());

This is good starting point but does not take any further :-) For serious measurement we need more data points like best response time , worst  , response time at some percentile like how much 90 % of request are taking. Such type of summary will allow us to understand service latency in more details also enable to take better decision. If data is available at different percentile then it can be plotted like below to understand it better.


For some consumer facing service, slow response means bad user experience and loss of revenue. Some of the industry like ecommerce, video streaming companies tracking response time proactively to retain customers.


Data cardinality

Tracking cardinality of some dimension is very important in data intensive application. It can be used for identifying skew in data or for finding optimal plan for execution of read or write request. Once of such examples is Index statistics that is maintained by databases. Database cost based optimizer will use index stats to decided which index will result in faster execution. 

For time series database tracking distribution of data based on day of time will be useful to find peek/busy time and it can used that information for having custom partition logic to manage load. Na├»ve way of doing this will be using frequency map but it will soon run into various performance issue and also limit options to do range query. 

Histogram will come very handy for such scenario. Below charts shows distribution by time.



Frequency density

This is popular use case in data science community. For example e-commerce company will be interested in know spend bins of customer or restaurant will be interested in knowing about frequency of visits. 

These metrics can help business in know about wealthy customers and come up with better strategy to retain or get new customers. Below example from Investopedia  contains people density by age, very useful information for portfolio manager.



Histogram has many more interesting use case. One of the cool property about histogram is that it is Commutative. Items order makes no different in final output due to which it is possible to build histograms in parallel and merge it later.

Another important property of histogram is cumulative, this property allows to do cumulative calculation by merging multiple bins/buckets.


Code Snippet 

Now lets look at one of the histogram library that can be used for various use case.

HdrHistogram by Gil Tene is battel tested library build for measuring latency of application. I found this library to be useful for other use case also. This library is very memory efficient and designed to work in low latency environment and also has very compact loss less serialization.

Latency Measurement

Histogram histogram = new Histogram(2);
range(0, 1000_000)
.forEach(v -> {
long timeTaken = doSomething(v);
histogram.recordValue(timeTaken);
});
histogram.outputPercentileDistribution(System.out, 1.0);
Histogram class is part of HdrHistogram lib, it accepts various parameter. In this example we are specifying precision to use. This value can range in 0 to 5, precision has direct impact on size of histogram.

Another important thing about this library is that histogram will auto resize based on input value and this property plays big role in avoiding upfront allocation.
Running above code will produce output like below 


Lets try to understand output. Output contains 4 columns ( Value , Percentile, Total Count, 1/1-Percentile)

Value - Input value for given percentile 
Percentile - % of records in this group/bucket.
Total Count - Count of records in bucket.

Lets pick some bucket values to understand how to use it.


50% request took 5 Seconds - This metrics is very common when measuring but of no use because it is like average/mean. If you want to do some serious measurement then stay away from average.

97.5% request took 9.7 seconds - This is where it becomes interesting, 25% of request is taking double! It is really bad user experience. 

99.86% request took 10+ seconds - This provides real worst case, around 200K+ request is part of this group and it is takes double time as compared to average. 

Having latency broken down by percentile provide great insight into what is real customer experience. If you are e-commerce shop then every micro second counts and can translate into big gain or loss. 
Trading application also sensitive to latency spike and makes big difference on how fast buy/sell trades can be matched.

Another thing that is very common in micro services world is that to render full page 100s of API call is required and it adds risk that atleast one of the API call to get hit by worst case latency, so overall user experience become bad:-(

Next time anyone talks about latency then ask for 99.9999 percentile to know about real impact.
 

Data cardinality

Database has optimizers like rule based and cost based. Cost based optimizer generate plan based on cost of operations like Join, filter etc. and select best plan quickly. 
One of the important factor in deciding which plan to use is based on data distribution/stats. Each index maintains data cardinality stats so that optimizer can quickly figure out which index to use. 

Cardinality is also useful to find data skew and to come up with plan to handle data skew. Distributed computing framework like Apache Spark suffer from skew and it depends on engineer to add some intelligence in code to avoid Spark job failure.

Trust me real data is always Skewed. 

Lets take a scenario that an E-commerce Giant want to keep some statistics like purchase volume at merchant level, so that it can come up better platform scaling options.

Histogram internally maintains frequency count, so it can be used to calculate total volume at merchant level in efficient way.
For this scenario lets assign unique integer id to each merchant and record purchase in histogram, for recoding purchase we can pass merchant id in histogram.

Some code on how this can be done. 

Histogram histogram = new Histogram(2);
range(0, 1000_000 * 100)
.forEach(v -> {
int merchant = merchant();
histogram.recordValue(merchant);
});
histogram.outputPercentileDistribution(System.out, 1.0);
In above example merchant id from 100 Million transaction are used to build histogram. To show some real skew i have added 50% weight to one particular merchant (i.e merchant id 101).

Running above code will produce something like below

 Value     Percentile TotalCount 1/(1-Percentile)

        1.00 0.000000000000       1043           1.00
      101.00 0.100000000000   50050299           1.11
      101.00 0.200000000000   50050299           1.25
      101.00 0.300000000000   50050299           1.43
      101.00 0.400000000000   50050299           1.67
      101.00 0.500000000000   50050299           2.00
    10047.00 0.550000000000   55023464           2.22
    20095.00 0.600000000000   60047899           2.50
    30079.00 0.650000000000   65038139           2.86
    40191.00 0.700000000000   70092127           3.33
    50175.00 0.750000000000   75086627           4.00
    55039.00 0.775000000000   77520056           4.44
 
50% of transactions (i.e 50 Million) are part of 101 merchant. This information can be used for optimization of read query or write query or coming up with some good product offerings.

One thing to remember that histograms can be merged, so it is possible to calculate histogram using multiple nodes and later merge it to get full view, infact it is possible to build histogram incrementally. 

Next time if data skew is troubling you then try Histogram.

Frequency Density 

This use case takes histogram to next level. Lets extend E-commerce example and now we want to rank customers based on total dollar value they spend.

Histogram comes handy, we have to just feed customer wise total spend and get the buckets that can used for ranking customer. 

Histogram histogram = new Histogram(2);
range(0, 1000_000 * 100)
.forEach(v -> {
long spend = spendValue();
histogram.recordValue(spend);
});
histogram.outputPercentileDistribution(System.out, 1.0);


Again taking 100 million purchase and this time adding spend value, this will create buckets based on spend and later customers can be ranked using these buckets.

Buckets output will look something like below.

 Value     Percentile TotalCount 1/(1-Percentile)

        0.00 0.000000000000       4963           1.00
     2007.00 0.100000000000   10036946           1.11
     4015.00 0.200000000000   20077808           1.25
     6015.00 0.300000000000   30075647           1.43
     8031.00 0.400000000000   40157837           1.67
    10047.00 0.500000000000   50238571           2.00
    11007.00 0.550000000000   55038969           2.22
    12031.00 0.600000000000   60159541           2.50
    13055.00 0.650000000000   65279759           2.86
    14015.00 0.700000000000   70077894           3.33
 
This output can be interpreted like 10 million customer spends in range of 0 to 2K, another 10 million spends 2K to 4K and so forth. Higher bucket will contain premium customers.    

Histogram has many more application, it can be used in machine learning to build features because they allow various vector based arithmetic like add/substract. 

All the code used in this post is available @ measure github

Sunday, 27 September 2020

Let the Stream flow

Java Streams is one of the powerful feature of JDK that is based on Lambda.

This post is quick refresher of Streams concepts using learning test.



Streams are made up of 

Source |> map | filter |>reduce 

Stream basic

Stream is computation pipeline that start with Source and series of intermediate operation and ends with terminal operation.

Stream is expressed as pipeline of functional transformation and it enable optimal execution strategy like lazy execution, short circuiting & fusion of operations.

These execution strategy allows to avoid un-necessary materialization of data because many things are done as Single pass or by multiplexing.

Streams can be also seen as SIMD at application layer. Stream is made of state less & state full operations, state less operations are part of single stage of pipeline. 

State less operation like ( map, flatmap,filter) are fused to provide optimal execution and only state full operation like (sort, takewhile , drop while,limit, distinct ) can add barrier or new stage in pipeline.

Since stream is computation pipeline, so it takes advantage of CPU caches by performing all the transformation on single element while it is hot in cache. Some time this execution strategy is also called depth first, goes to leaf and process it.

Accessing data while it is hot in CPU cache makes big difference in performance and you can read about it more on post cpu-cache-access-pattern 

Stream ends with terminal operations. Terminal operation are short-circuit ( allMatch, findFirst,anyMatch) or non short circuit like (reduce , collect , forEach)

Short circuit will cause early termination and very useful for search related operation.

Non short circuit operation will touch every element of stream, reduce & collect is example of such operation and allows to solve very complex problems.

Streams favors reduction/folding over imperative accumulation, reduction are easy to make it parallel and simple to understand. It also opens up embarrassing parallel opportunity.  

Reduction also has property of associativity for example

(+(+ (+ a b) c ) d) = (+ (+ a b) (+ c d))

Above example is reduction using Plus(+) operator, left is sequential reduction and right one is parallel but output is same for both of the execution path.

Associative operator are embarrassingly parallel.   

Power of stream is in advance reduction patterns and collectors class has tons of example.

Collector accepts supplier, accumulator & combiner. These 3 things are composed to do very complex reduction.

Lets look at String based reduction by looking at world famous String joiner. 

values
.stream()
.collect(StringBuilder::new, (sb, value) -> sb.append(value), (sb1, sb2) -> sb1.append(sb2))


If stream ends with non short circuit operation then records are processed in batch(forEachRemaning) and in case of shortCircut it is process as single at a time(tryAdvance)


Stream Operations Flag

Every stream has some characteristic that is used by stream framework for optimization.
As a application programmer we don't get exposed to stream operations flag but knowing these will help in understanding optimization technique used by stream.
  
Stream at source is defined with characteristic and stream operation(map, filter, limit, sort etc) may preserve, clear or inject new characteristic.
Terminal operation will result in inspecting all the characteristic and select optimized code path for execution.

Very simple stream characteristic is Parallel, this is taken in account by stream framework to use single thread vs multiple threads for execution.
Some of the other Stream flags are
  • Distinct - Stream has distinct values.
  • Sorted - Element are sorted by natural order
  • Ordered - Element has order 
  • Size - Finite size, important for splitting .
  • Short Circuit - stream can be short circuit, it may be due to find, limit etc
Lets take distinct stream operation to understand how it gets optimized.
Once we have distinct element then we can count the number of element using below code snippet. 

values
.stream()
.distinct()
.count()

Distinct count of element can be implemented in many ways based on underlying collection of stream.

  • List/LinkedList 

 If underlying collection is list then only brute force way can be used for distinct count. We have to allocate Set and keeping adding element in the set and then return size. This will cause some memory pressure on system when collection is large.
 

  • SortedSet
If underlying collection is Sorted collection like Tree Set then distinct count does not need any memory allocation and distinct can be computed by using simple loop checking current and previous value, code snippet doing distinct count 


static int distinct(SortedSet<String> values) {
Iterator<String> itr = values.iterator();
if (!itr.hasNext()) return 0;

String previous = itr.next();
int itemCount = 1;
while (itr.hasNext()) {
String next = itr.next();
if (!previous.equals(next)) {
itemCount++;
previous = next;
}
}
return itemCount;
}

  • Set
If underlying collection is Set then it is just calling size function on it!

This is simple example on how Stream pipeline can take advantage of Stream Ops to plugin optimal code. This seems like the way database optimizer works and shows power of declarative programming.
  
 We can take this further by adding new Stream ops like approx distinct and it can be based on HyperLogLog probabilistic data structure and it can handle any types of collection with very less memory overhead and by trading off little bit of accuracy.  I shared about some of the probabilistic data structure in data-structure-for-big-data post.

Other flags also does lots of magic to make code fast.

Conclusion

Stream is very powerful abstraction for solving problem using declarative way.
Enjoy the various examples of streams in github project streams. Examples are organized in chapters and it cover simple to advance usage patterns.


Tuesday, 11 August 2020

Speak the language of domain - part 2

While researching for post - speak-language-of-domain i spent some time building external DSL and experimented with parser and compiler.

Parser & compiler are important components for external DSL because it converts external DSL to executable.


We will take a few examples of external DSL and look at what is required to build a full solution.

Rule Engine

I will use below DSL as external dsl and try to convert it into an executable.  



We allow users to upload dsl files and change the behavior of the system at runtime based on rules in the dsl file. Such a type of feature will make the system very flexible and powerful.
Remember Peter Parker principal "With great power comes great responsibility" , so use such things in the production system at your risk!
     


external DSL pipeline


Loading a DSL file is a trivial thing , it can be uploaded to the system using a web interface. 

Once a file is uploaded then it needs to be parsed, this is where things become little interesting and many built in libraries are available to parse files and extract tokens but in this example we will do simple parsing and code generation.

For parsing template files that will be used, content of the DSL file is put in template file before compilation. 

For this example I am going to use the template file.


Above template file has 2 variables $REPLACE & //CODE, these two variables are replaced during the parsing and code generation phase.

$REPLACE - will contain some random value to give a unique name to the block of code that needs to be compiled. 

//CODE - will contain DSL code submitted by the user.

Once tokens are replaced we are ready for compilation.

For compilation we will use JavaCompiler  API, this api gives a programmatic interface to JVM compiler. Very powerful API for doing something in running VM.

 Code snippet for compiling code using JavaCompiler interface.


Once code is compiled then using the java reflection rule can be loaded.

Code snippet for DSLLoader 


Once the rule is loaded from DSL then it is straightforward to update the current VM to use the newly created rule.

DSL code is type safe, all the checks are done at compile time and it is highly optimized code because it has no reflection or other magic, so it will benefit from all the hotspot compiler.

For DSL creator perspective these rules can be tested in sandbox before using it production.

Few things that can be improved are error messages when DSL has compilation error, right now it is just showing java error messages and it will not be that user friendly for non java programmers. 

Trading System

In earlier post below internal DSL was used.

newOrder(equity("IBM"), (order, ts) -> {
order
.buy(100)
.at(126.07d);

ts.execute(order);
});

newOrder(equity("GOOG"), (order, ts) -> {
order
.sell(100)
.at(1506.85)
.partial();

ts.execute(order);
});

Lets have external DSL for this.

buy 100 IBM stocks at 122.06
This has few keywords(i.e buy, stocks , at) related to trading domain and also has few parameter related to order like 100 (i.e unit) , IBM ( i.e stock) , 122.06 ( i.e price).
For this example it is easy to write as parser that will create order object.
Refer to Parser for more details on how text is parsed and order object is created.

All the code used in this post is available at DSL