Showing posts with label BigData. Show all posts
Showing posts with label BigData. Show all posts

Sunday, 13 December 2020

Sorting under constraint algorithms

In the compiler world code inlining is the most important optimization that enables other optimization like dead code elimination, pre fetching , out of order execution etc.

Similarly sorting also enables many optimizations like binary search , streaming aggregation, range search operations, prefix compression, run length encoding , delta encoding, understanding trends, posting list, data partition  etc.



Sorting is memory and compute intensive work and many times we don't have enough compute/memory to do it.

In this post I will share 2 sorting algorithms that can be used when a system has memory or CPU constraint.

Disk based sorting

We have file containing 1 TB data and we want to sort it. Data is huge to it is not possible to use standard in-memory sorting algorithm for this. 

One way to handle sorting of such data is split it in chunks, sort the chunk in memory, persist chunk to disk and finally merge the sorted chunk using k way merge algorithm.


At high level sort pipeline will look something like below 



Nice thing about this algorithm is that it is Embarrassingly_parallel.

This algorithm is also good example of Divide-and-conquer_algorithm and this technique can be applied both the stages.

This algorithm has 2 stages in the pipeline that can be executed in parallel to take advance of multiple cores.

Lets assume that input file contains 10 Million then it can be decomposed in Split stage


In merge stage we have to do reverse operations of taking multiple files and creating single one.




Split & Merge has different types of compute/disk requirement and it is possible to make both the stage parallel or just one based on constraint.

Overall sort pipeline will look like below.  



This algorithm is used by many databases to manage result sets that can't be fit into memory. 

Important logic in this algorithm is K-Way merge of sorted results. If K is 2 then it is straight forward.

2 Way merge

Merge process is pick head from both iterators and add the value that is less, move pointer of iterator whose value was added.

Need to handle some edge conditions to avoid buffer overflow while reading and handling iterators of different size.


v1.next();
v2.next();

while (v1.hasNext() && v2.hasNext()) {
value1 = v1.value();
value2 = v2.value();
if (isLessThan(value1, value2)) {
buffer.add(value1);
v1.next();
} else {
buffer.add(value2);
v2.next();
}
}

if (v1.hasNext()) {
append(buffer, v1);
} else if (v2.hasNext()) {
append(buffer, v2);
}

K Way merge

Assume K is 4, then one way to merge is to split the whole list in pairs of 2, keep merging in pairs and finally start merge out of 2 way merges. This is a good algorithm but can't take advantage of batching multiple iterators.

Recommended way is to use Heap of K values. This is more efficient as we can process multiple inputs in a single pass and can reduce IO overhead also. 

PriorityQueue<LineIterator> heap=...
LineIterator itr;

while ((itr = heap.poll()) != null) {
write(writer, itr.value());
itr.next();
if (itr.hasNext()) {
heap.add(itr);
}
}

BitMap Sorting

Bitmap is a powerful data structure for searching and has some interesting properties for sort also.

Consider a scenario where the file contains n positive integer and each value is less than K.

K can be really huge depending on max value, to put this in context just by using 256 MB memory billions of int values can be sorted.

Idea is based around an allocated array with every element of K word (i.e 32 or 64). If we used 32 bit words then 32 values can be stored in every slot. Total capacity of this data structure is 32 * len(array).

Setting bit needs 2 information, slot in array and position in that slot.




Bit fiddling enables to pack multiple values in a single word, you want to read more on bit fiddling then refer to bit-fiddling.

In this example bytes is 4 and word size is 32.

Checking for value is straightforward and it involves doing bit wise & on Slot value.

Complete working code 

public static final int NO_OF_BYTE = 4;
private final int WORD_SIZE = 8 * NO_OF_BYTE;
private final int SLOT_SHIFT = NO_OF_BYTE + 1;
private final int[] values;
private final int maxValue;

public BitMapSort(int maxValue) {
this.maxValue = maxValue;
this.values = new int[1 + maxValue / WORD_SIZE];
logUsageInfo(maxValue);
}


public void set(int v) {
this.values[slot(v)] |= position(v);
}

public boolean check(int v) {
int value = this.values[slot(v)] & position(v);
return value != 0;
}

public int position(int v) {
return 1 << (v & WORD_SIZE - 1);
}

public int slot(int v) {
return v >> SLOT_SHIFT;
}


Whole pipeline will look something like below


Trade Off


Nothing is perfect and this also has some constraints and it is good to be aware of it.

- This is a dense value data structure, so if we have to store a value that is of 100 Million then we have to allocate at least 100 million bits ( 95 MB). If values are sparse then find alternate data structure. 

- Thread safety has to be handled at slot level because 32 values are packed in a single slot.

- Values should be distinct but if duplicate values are present and it is going to be less duplicates then additional data structures like maps can be used to keep frequency count. This needs to be handled in a little intelligent way like having some threshold on duplicate value and if it crosses that threshold then it is better to stop accepting value to avoid having everything going to map.

- Iteration. Since this is a compressed representation of dense value, iteration on available value has to be handled in a streaming approach to avoid allocation of huge in memory collection. One of the approach could be having API for consuming single value at a time and let client to decide on what to do with those values, example of such iteration could look something like below

public void consume(IntConsumer consumer) {
IntStream
.range(1, maxValue)
.filter(this::check)
.forEach(consumer::accept);
}

- Range iteration. This data structure is very good for range query.

- Compact set. This is also good DS for set related operations.

Conclusion

These are simple and yet very powerful algorithms and if this fits the bill then it can be the difference between solving the problem or not solving at all. 

Saturday, 26 May 2018

Spark Microservices

As continuation of big data query system blog, i want to share more techniques for building Analytics engine.

Take a problem where you have to build system that will be used for analyzing customer data at scale.

What options are available to solve this problem ?
 - Load the data in your favorite database and have right indexes.
   This works when data is small, when i say small less then 1TB or even less.

 - other option is to use something like elastic search 
Elastic search works but it comes up with overhead of managing another cluster and shipping data to elastic search

 -use spark SQL or presto 
Using these for interactive query is tricky because of minimum overhead that is required to execute query can be more than latency required for query which could be 1 or 2 sec.

 -use distributed In-Memory database. 
This looks good option but it also has some issues like many solution is proprietary and open source one will have overhead similar to Elastic Search.

 - Spark SQL by removing Job start overhead.
I will deep dive in to this option. Spark has become number one choice for build ETL pipeline because of simplicity and big community support and Spark SQL can connect to any data source ( JDBC,Hive ,ORC, JSON, Avro etc).

Analytics query generate different type of load, it only needs few columns from the whole set and executes some aggregate function over it, so column based database will make good choice for analytics query.

Apache Parquet is a columnar storage format available to any project in the Hadoop ecosystem, regardless of the choice of data processing framework, data model or programming language.
So using Spark data can converted to parquet and then Spark SQL can be used on top of it to answer analytics query.

To put all in context convert HDFS data to parquet(i.e column store), have a micro services that will open Sparksession  , pin data in memory and keep spark session open forever just like database pool connection.

Connection pool is more than decade old trick and it can be used for spark session to build analytics engine.

High level diagram on how this will look like

Spark Session is thread safe, so no need to add any locks/synchronization.
Depending on use case single or multiple  spark context can be created in single JVM.



Spark 2.X has simple API to create singleton instance for SparkContext and handles thread based SparkSession also.
Code snippet for creation spark session


Caution
All this works fine if you have micro service running on single machine but if this micro service is load balanced then each instance will have one context.
If single spark context requests for thousands of cores then some strategy is required to load balancing Spark context creation. This is same as database pool issue, you can only request for resource that is physically available.

Another thing to remember that now driver is running in web container so allocate proper memory to process so that web server does not blow up with out of memory error.

I have create micro services application using Spring boot and it is hosting Spark Session session via Rest API.

This code has 2 types of query
 - Single query per http thread
 - Multiple query per http thread. This model is very powerful and can be used for answering complex query.

Code is available on github @ sparkmicroservices

Sunday, 26 November 2017

Big data query system

HDFS was born on 2006 to fix the distributed storage/computing problem and then came spark riding on cheap hardware in 2012 to fix distributed processing problem in disruptive style.

One problem space that has seen lot of innovative tools is  "How to build information system" for huge data, cheap hardware has enabled to build fast analytics system by using in-Memory techniques.
Data is growing every moment and it is becoming very challenging to all analytics tools to keep-up with it, some system use Column stores, special index like Bitmap index etc to build responsive system.

One more technique called Summarization is also used in many system to solve this problem, in this blog i will discuss some ideas around Summarization and tradeoff of using this technique.

Definition of summarization as per vocabulary.com is

To summarize means to sum up the main points of something — a summarization is this kind of summing up. Elementary school book reports are big on summarization.

When you're a trial lawyer, the last part of the argument you make before the court is called a summation. It included a summarization of the points you have made in the trial so far, but it goes one step further, to push this summarization toward a conclusion you hope the judge or jury will accept.    

This picture explains it, you take something big and make it small by retaining important attributes.

One more technique is Sampling that also helps in reducing size of something that is unmanageable to manageable, but it trade-offs accuracy for this and may system can tolerate some percentage of error.

Sampling is also used to to validate some tests or assumption, so sampling allows to consumer information using limited resource.

Summarization and Sampling can used together to build some fast information system.


One of the simple summary approach is build group the data into different set and each record/row should be just in one set.

Lets take a toy data set to build groups.


yearsexNametype_of_course
1993MaleM1Humanities & Social Sciences
1993MaleM2Mass Communication
1993MaleM3Accountancy
1993MaleM4Business & Administration
1993MaleM5Law
1994MaleM6Education
1994MaleM7Applied Arts
1994MaleM8Humanities & Social Sciences
1994FemaleF1Education
1994FemaleF2Applied Arts
1994FemaleF3Humanities & Social Sciences
1994MaleM9Law


This data is for what type of course people study, it can split into couple of groups using year,sex,type of course.

1993,Male,Humanities & Social Science
1994,Female,Education
1994,Male,Law
1993,Male,Law

Grouping convert huge data set to small set and then sampling can be done based on these groups to pick up items from this group.

One of the way is to take random sample but it has limitation  that some group will be missed and sample data will be not representative to original dataset and it will be impossible to answer many query.

We need smart sampler and stats guru have already figured it out, it is called Stratified Sampling

This sampling involves
 - Create Strata, this is nothing but create groups.




yearsextype_of_courseno_of_graduates
1994MalesHumanities & Social Sciences512
1994MalesBusiness & Administration413
1994FemalesInformation Technology196
1994FemalesArchitecture & Building182
1994FemalesEngineering Sciences227
Total1530


- Decided sample size

Lets take 5% of sample that comes to 77 (5% of 1530)

- Compute contribution of each group


yearsextype_of_courseno_of_graduatesContribution
1994MalesHumanities & Social Sciences51233.46405229
1994MalesBusiness & Administration41326.99346405
1994FemalesInformation Technology19612.81045752
1994FemalesArchitecture & Building18211.89542484
1994FemalesEngineering Sciences22714.83660131
Total1530
- Select sample records

77 records are required and break of 77 will be based weight of individual group


yearsextype_of_courseno_of_graduatesContributionSample Size ( 77)
1994MalesHumanities & Social Sciences51233.4640522925.6
1994MalesBusiness & Administration41326.9934640520.65
1994FemalesInformation Technology19612.810457529.8
1994FemalesArchitecture & Building18211.895424849.1
1994FemalesEngineering Sciences22714.8366013111.35
Number of records selected are based on weight of group and this gives data from each group.

By this approach 1530 records got reduced to 77 , lets try to answer some of query using this sample data, we also try to estimate what it will look at 100% and calculate error.

100 % estimate = 1%  * 100

1% = Sample Answer/Sample %


QuestionActual Answer5% Sample100% ( Estimated)Error
No of people graduated in 199415307715400.006535947712
Males graduated in 1994925469200.005405405405
All Females605306000.00826446281
Engineering Sciences student227112200.03083700441


Look at the error column, worst answer has 3% error and with this small error query can be answer in milli second vs seconds.

I used singapore open data graduates-from-university-first-degree-courses-by-type-of-course to build app that answer query based on sampling.

Code is available @ github

Wednesday, 25 October 2017

Data structure for big data

Today crunching Terabyte of data is very common and it brings up interesting challenges.

Memory is getting cheaper and it is enabling application to keep all the data in memory and process it but some data set are very huge and does not fit in single machine, incase it fits in memory but latency of processing will be not a acceptable.


Lets take a example to understand this

You are building web analytics product that collects all the clicks/page load etc events, each event object will have some of these attributes.

 Volume generated by such type of tracking is huge and to ask any interesting question on such data set is resource intensive.   

Lets look at some of the resource intensive questions 

- How many distinct ip address are present
- How many distinct visitor Id
- Top 10 Pages
- Top X Visitor
- How many times Top X pages are requested
- Any page requested by blacklisted IP or user 


Simple way to solve distinct query is to build set and count the items in set
Building set is going to take memory proportional to number of distinct element and set has some load factor to make sure read/write operation are constant time. So memory wise it is super expensive.

Key insight to solve the problem is how accurate distinct count should be , it is OK to have some error(i.e 5 to 10%) and if answer is yes then some magic can done.

   
Since this problem is about just the count , so we don't have to maintain real value and that will give huge saving, algorithm needs some way to track  whether current value is seen or not.



at high level algorithm is 
 - Compute the hash
 - Find the index in bit vector
- Mark the vector if is not set and increment the distinct counter.

It is very simple algorithm and does the job, it is called Linear Counting.
This works well but it has some limitation that you need to workout size which controls accuracy to get best result it should be close to number of distinct values you expect to see.


There is another class of algorithm called LogLog/Hyper LogLog which more powerful than Linear Counting.


Hash value is represented as binary value and then it is split in sub-string
One part is used to extract which bucket value should go
Next part is used to calculating value and it is done by counting no of leading zeros.

By using this technique each entry is put in different buckets for eg values starting with 01- Goes in X, values starting 001 goes in Y bucket.


To compute the final value all the buckets values are taken and it is averaged by applying some factor.

By using this technique billion elements can be counted by using very less memory.

HyperLogLog paper contains all the details of algorithm.
  
Linear Counting or Hyper Log can be used to answer below questions
- How many distinct ip address are present
- How many distinct visitor Id
- Distinct visitor by region, time etc
- Trending words like google trends
- Near similar document matching to remove duplicate document, it is used by search engine.


This data structure can be used for quick anomaly/fraud detection.
This also can be used for Nested_loop_join , spark is using it in ML lib for MatrixFactorizationModel and few more places to count keys in RDD.    

One of the nice properties of these data structure is merge operation.
for e.g. one hyper log can be maintained for unique visitor for each day and they can merged to get value for any time range.

Lets look at some memory usage numbers, it is computed using stream-lib
In this benchmark i used all the words from HarryPotterseries book.



Actual Count 11983





Type Memory ( KB) Count Error %
Set 431 11983 0
LinearCounting 31 12167 1.5
HyperLog 14 12253 2.2

In less than 1% of memory with 2 % of error this data structure is able to do cool thing !

Another interesting data structure for membership query is bloom filter, i wrote about in mr-cool-bloom-filter blog post.

Code used for bench-marking is available @ DistinctCountingApp.java

Saturday, 20 July 2013

ArrayList Using Memory Mapped File

Introduction
In-Memory computing is picking up due to affordable hardware, most of the data is kept in RAM to meet latency and throughput goal, but keeping data in RAM create Garbage Collector overhead especially if you don't pre allocate.
So effectively we need garbage less/free approach to avoid GC hiccups

Garbage free/less data structure
There are couple of option to achieve it
 - Object Pool 
Object pool pattern is very good solution, i wrote about that in Lock Less Object Pool blog

- Off Heap Objects
JVM has very good support for creating off-heap objects. You can get rid of GC pause if you take this highway and highway has its own risk!

-MemoryMapped File
This is mix of Heap & Off Heap, like best of world.

Memory mapped file will allow to map part of the data in memory and that memory will be managed by OS, so it will create very less memory overhead in JVM process that is mapping file.
This can help in managing data in garbage free way and you can have JVM managing large data.
Memory Mapped file can be used to develop IPC, i wrote about that in power-of-java-memorymapped-file blog

In this blog i will create ArrayList that is backed up by MemoryMapped File, this array list can store millions of object and with almost no GC overhead. It sounds crazy but it is possible.

Lets gets in action
In this test i use Instrument object that has below attribute
 - int id
 - double price

So each object is of 12 byte.
This new Array List holds 10 Million Object and i will try to measure writer/read performance

Writer Performance



X Axis - No Of Reading
Y Axis - Time taken to add 10 Million in Ms









Adding 10 Million element is taking around 70 Ms, it is pretty fast.

Writer Throughput
Lets look at another aspect of performance which is throughput


X Axis - No Of Reading
Y Axis - Throughput /Second , in Millions









Writer throughput is very impressive, i ranges between 138 Million to 142 Million

Reader Performance

X Axis - No Of Reading
Y Axis - Time taken to read 10 Million in Ms









It is taking around 44 Ms to read 10 Million entry, very fast. With such type of performance you definitely challenge database.

 Reader Throughput

X Axis - No Of Reading
Y Axis - Throughput /Second , in Millions









Wow Throughput is great it is 220+ million per second

It looks very promising with 138 Million/Sec writer throughput & 220 Million/Sec reader throughput.

Comparison With Array List
Lets compare performance of BigArrayList with ArrayList,

Writer Throughput - BigArrayList Vs ArrayList




 Throughput of BigArrayList is almost constant at around 138 Million/Sec, ArrayList starts with 50 Million and drops under 5 million.

ArrayList has lot of hiccups and it is due to 
 - Array Allocation
 - Array Copy
 - Garbage Collection overhead

BigArrayList is winner in this case, it is 7X times faster than arraylist.

Reader Throughput - BigArrayList Vs ArrayList

ArrayList performs better than BigArrayList, it is around 1X time faster.

BigArrayList is slower in this case because
 - It has to keep mapping file in memory as more data is requested
 - There is cost of un-marshaling

Reader Throughput for BigArrayList is 220+ Million/Sec, it is still very fast and only few application want to process message faster than that.
So for most of the use-case this should work.

Reader performance can be improved by using below techniques 
 - Read message in batch from mapped stream
 - Pre-fetch message by using Index, like what CPU does

By doing above changes we can improve performance by few million, but i think for most of the case current performance is pretty good

Conclusion
Memory mapped file is interesting area to do research, it can solve many performance problem.
Java is now being used for developing trading application and GC is one question that you have to answer from day one, you need to find a way to keep GC happy and MemoryMapped is one thing that GC will love it.

Code used for this blog is available @ GitHub , i ran test with 2gb memory.
Code does't handle some edge case , but good enough to prove the point that that MemoryMapped file can be winner in many case.