Showing posts with label Heap. Show all posts
Showing posts with label Heap. Show all posts

Tuesday, 15 March 2022

Concurrent Heap data structure

Lets Heapify!!!




Heap is very popular data structure used for solving Top X types of problem.

For eg find the top 10 popular items by sales volume, top X users by activity etc.

PriorityQueue data structure of java is based on heap and can help in answering any top X type of query. PriorityQueue is not thread safe, so it can't be used in highly concurrent environment without adding lock.

Underlying data structure of heap is array and elements are shifted up and down to maintain the element order, for each swim operation the full array should be locked down to avoid race condition. 

Even read options like poll are mutating operation due to which it is hard to share Heap with multiple threads.

Underlying algorithm makes is very hard to use heap in concurrent or parallel environment.  

Heap - Source: Wikipedia

Lets look at other options to achieve heap like functionality without giving up on concurrency.

Concurrent heap data structure need following properties

  • Highly concurrent ordered collection. 
  • Parallel writes/read support.
  • Top X type of API.
  • Multiple top operations supported concurrently using same instance of data structure.

Concurrent Skip list from JDK looks good candidate for this but we need to add some missing functionality.

Lets recap how Skip List data structure looks.


SkipList - Source:Wikipedia

SkipList is ordered multiple link list, it has got some fast lanes and slow lanes. Fast lanes allow to find element in approx log(n) cost.


- Unique Item identification 

JDK has set and map implementation of Skiplist. Map/Set can only have unique keys, we need to find a way to tweak unique key requirement to make set behave like Heap. 

We can use little trick, every value that is added to SkipList will have additional metadata that can running sequence number or timestamp, this extra metadata can be used for resolving conflict when 2 items are equal based on comparison.

Lets take product by sales use case for code samples. 

SalesItem is Comparable and it compares items by sales volume. 

class SalesItem implements Comparable<SalesItem> {

private final String product;
private final long sales;

@Override
public int compareTo(SalesItem o) {
return Long.compare(sales, o.sales);
}
}

We can't add SalesItem in SkipList because items having same sales volume will be rejected.

We can add another wrapper class that adds extra metadata to handle this problem. It will look something like this


class Item implements Comparable<Item> {
private final T value;
private final long index;

@Override
public int compareTo(Item o) {

int r = this.value.compareTo(o.value);
r = heapType.equals(HeapType.Max) ? -r : r;
if (r != 0) {
return r;
}
return Long.compare(index, o.index);
}

index is that extra metadata that is added to handle items with same sales volume and it case of conflict it will order by index

 - TopX API

For TopX API streams.limit can be used, another benifit of using streams APIs is that client application can use other cool features of Streams API.

Full Code for Concurrent Heap

public class ConcurrentHeap<T extends Comparable> {

private final AtomicLong id = new AtomicLong();
private final NavigableSet<Item> data = new ConcurrentSkipListSet<>();
private final HeapType heapType;

public void add(T value) {
data.add(new Item(value, id.incrementAndGet()));
}

public Stream<T> stream() {
return data
.stream()
.map(v -> v.value);
}

public Stream<T> top(int x) {
return stream().limit(x);
}

class Item implements Comparable<Item> {
private final T value;
private final long index;

@Override
public int compareTo(ConcurrentHeap<T>.Item o) {

int r = this.value.compareTo(o.value);
r = heapType.equals(HeapType.Max) ? -r : r;
if (r != 0) {
return r;
}
return Long.compare(index, o.index);
}

Item(T value, long index) {
this.value = value;
this.index = index;
}

}


Underlying data structure that is behaving like Heap is NavigableSet, JDK has 2 implementation if this first one is TreeSet and another one is ConcurrentListSkipSet. 

We can choose between TreeSet/ConcurrentListSkipSet based on need to avoid the cost of concurrency in single thread env.

 

Full working code for this blog post is available @ github 

Sunday, 13 December 2020

Sorting under constraint algorithms

In the compiler world code inlining is the most important optimization that enables other optimization like dead code elimination, pre fetching , out of order execution etc.

Similarly sorting also enables many optimizations like binary search , streaming aggregation, range search operations, prefix compression, run length encoding , delta encoding, understanding trends, posting list, data partition  etc.



Sorting is memory and compute intensive work and many times we don't have enough compute/memory to do it.

In this post I will share 2 sorting algorithms that can be used when a system has memory or CPU constraint.

Disk based sorting

We have file containing 1 TB data and we want to sort it. Data is huge to it is not possible to use standard in-memory sorting algorithm for this. 

One way to handle sorting of such data is split it in chunks, sort the chunk in memory, persist chunk to disk and finally merge the sorted chunk using k way merge algorithm.


At high level sort pipeline will look something like below 



Nice thing about this algorithm is that it is Embarrassingly_parallel.

This algorithm is also good example of Divide-and-conquer_algorithm and this technique can be applied both the stages.

This algorithm has 2 stages in the pipeline that can be executed in parallel to take advance of multiple cores.

Lets assume that input file contains 10 Million then it can be decomposed in Split stage


In merge stage we have to do reverse operations of taking multiple files and creating single one.




Split & Merge has different types of compute/disk requirement and it is possible to make both the stage parallel or just one based on constraint.

Overall sort pipeline will look like below.  



This algorithm is used by many databases to manage result sets that can't be fit into memory. 

Important logic in this algorithm is K-Way merge of sorted results. If K is 2 then it is straight forward.

2 Way merge

Merge process is pick head from both iterators and add the value that is less, move pointer of iterator whose value was added.

Need to handle some edge conditions to avoid buffer overflow while reading and handling iterators of different size.


v1.next();
v2.next();

while (v1.hasNext() && v2.hasNext()) {
value1 = v1.value();
value2 = v2.value();
if (isLessThan(value1, value2)) {
buffer.add(value1);
v1.next();
} else {
buffer.add(value2);
v2.next();
}
}

if (v1.hasNext()) {
append(buffer, v1);
} else if (v2.hasNext()) {
append(buffer, v2);
}

K Way merge

Assume K is 4, then one way to merge is to split the whole list in pairs of 2, keep merging in pairs and finally start merge out of 2 way merges. This is a good algorithm but can't take advantage of batching multiple iterators.

Recommended way is to use Heap of K values. This is more efficient as we can process multiple inputs in a single pass and can reduce IO overhead also. 

PriorityQueue<LineIterator> heap=...
LineIterator itr;

while ((itr = heap.poll()) != null) {
write(writer, itr.value());
itr.next();
if (itr.hasNext()) {
heap.add(itr);
}
}

BitMap Sorting

Bitmap is a powerful data structure for searching and has some interesting properties for sort also.

Consider a scenario where the file contains n positive integer and each value is less than K.

K can be really huge depending on max value, to put this in context just by using 256 MB memory billions of int values can be sorted.

Idea is based around an allocated array with every element of K word (i.e 32 or 64). If we used 32 bit words then 32 values can be stored in every slot. Total capacity of this data structure is 32 * len(array).

Setting bit needs 2 information, slot in array and position in that slot.




Bit fiddling enables to pack multiple values in a single word, you want to read more on bit fiddling then refer to bit-fiddling.

In this example bytes is 4 and word size is 32.

Checking for value is straightforward and it involves doing bit wise & on Slot value.

Complete working code 

public static final int NO_OF_BYTE = 4;
private final int WORD_SIZE = 8 * NO_OF_BYTE;
private final int SLOT_SHIFT = NO_OF_BYTE + 1;
private final int[] values;
private final int maxValue;

public BitMapSort(int maxValue) {
this.maxValue = maxValue;
this.values = new int[1 + maxValue / WORD_SIZE];
logUsageInfo(maxValue);
}


public void set(int v) {
this.values[slot(v)] |= position(v);
}

public boolean check(int v) {
int value = this.values[slot(v)] & position(v);
return value != 0;
}

public int position(int v) {
return 1 << (v & WORD_SIZE - 1);
}

public int slot(int v) {
return v >> SLOT_SHIFT;
}


Whole pipeline will look something like below


Trade Off


Nothing is perfect and this also has some constraints and it is good to be aware of it.

- This is a dense value data structure, so if we have to store a value that is of 100 Million then we have to allocate at least 100 million bits ( 95 MB). If values are sparse then find alternate data structure. 

- Thread safety has to be handled at slot level because 32 values are packed in a single slot.

- Values should be distinct but if duplicate values are present and it is going to be less duplicates then additional data structures like maps can be used to keep frequency count. This needs to be handled in a little intelligent way like having some threshold on duplicate value and if it crosses that threshold then it is better to stop accepting value to avoid having everything going to map.

- Iteration. Since this is a compressed representation of dense value, iteration on available value has to be handled in a streaming approach to avoid allocation of huge in memory collection. One of the approach could be having API for consuming single value at a time and let client to decide on what to do with those values, example of such iteration could look something like below

public void consume(IntConsumer consumer) {
IntStream
.range(1, maxValue)
.filter(this::check)
.forEach(consumer::accept);
}

- Range iteration. This data structure is very good for range query.

- Compact set. This is also good DS for set related operations.

Conclusion

These are simple and yet very powerful algorithms and if this fits the bill then it can be the difference between solving the problem or not solving at all. 

Thursday, 25 July 2013

Which memory is faster Heap or ByteBuffer or Direct ?

Java is becoming new C/C++ , it is extensively used in developing High Performance System.
Good for millions of Java developer like me:-)

In this blog i will share my experiment with different types of memory allocation that can be done in java and what type of benefit you get with that.


Memory Allocation In Java
What type of support Java provide for memory allocation

 - Heap Memory
I don't i have to explain this, all java application starts with this.  All object allocated using "new" keyword goes under Heap Memory

- Non Direct ByteBuffer
It is wrapper over byte array, just flavor of Heap Memory.
ByteBuffer.allocate() can be used to create this type of object, very useful if you want to deal in terms of bytes not Object.

 - Direct ByteBuffer
This is the real stuff that java added since JDK 1.4.
Description of Direct ByteBuffer based on Java Doc

"A direct byte buffer may be created by invoking the allocateDirect factory method of this class. The buffers returned by this method typically have somewhat higher allocation and deallocation costs than non-direct buffers. The contents of direct buffers may reside outside of the normal garbage-collected heap, and so their impact upon the memory footprint of an application might not be obvious. It is therefore recommended that direct buffers be allocated primarily for large, long-lived buffers that are subject to the underlying system's native I/O operations. In general it is best to allocate direct buffers only when they yield a measureable gain in program performance."

Important thing to note about Direct Buffer is 
 - It is Outside of JVM
 - Free from Garbage Collector reach.

These are very important thing if you care about performance.
MemoryMapped file are also flavor of Direct byte buffer, i shared some of my finding with that in below blogs

arraylist-using-memory-mapped-file
power-of-java-memorymapped-file

- Off Heap or Direct Memory
This is almost same as Direct ByteBuffer but with little different, it can be allocated by unsafe.allocateMemory, as it is direct memory so it creates no GC overhead. Such type of memory must be manually released.

In theory Java programmer are not allowed to do such allocation and i think reason could be
 - It is complex to manipulate such type of memory because you are only dealing with bytes not object
 - C/C++ community will not like it :-)

Lets take deep dive into memory allocation

For memory allocation test i will use 13 byte of message & it is broken down into
 - int - 4 byte
 - long - 8 byte
 - byte - 1 byte

I will only test write/read performance, i am not testing memory consumption/allocation speed.

Write Performance



X Axis - No Of Reading
Y Axis - Op/Second in Millions











5 Million 13 bytes object are written using 4 types of allocation.
Direct ByteBuffer & Off Heap are best in this case, throughput is close to 350 Million/Sec
Normal ByteBuffer is very slow, TP is just 85 Million/Sec
Direct/Off Heap is around 1.5X times faster than heap


I did same test with 50 Million object to check how does it scale, below is graph for same.


X Axis - No Of Reading
Y Axis - Op/Second in Millions










Numbers are almost same as 5 Million.

Read Performance

Lets look at read performance


X Axis - No Of Reading
Y Axis - Op/Second in Millions











This number is interesting, OFF heap is blazing fast throughput for 12,000 Millions/Sec :-)
Only close one is HEAP read which is around 6X times slower than OFF Heap.

Look at Direct ByteBuffer , it is tanked at just 400 Million/Sec, not sure why it is so

Lets have look at number for 50 Million Object

X Axis - No Of Reading
Y Axis - Op/Second in Millions












Not much different.

Conclusion
Off heap via Unsafe is blazing fast with 330/11200 Million/Sec.
Performance for all other types of allocation is either good for read or write, none of the allocation is good for both.
Special note about ByteBuffer, it is pathetic , i am sure you will not use this after seeing such number.
DirectBytebuffer sucks in read speed, i am not sure why it is so slow

So if memory read/write is becoming bottle neck in your system then definitely Off-heap is the way to go, remember it is highway, so drive with care.

Code is available @ git hub

Update
Fixing broken code link - code available at github