Lets Heapify!!!
Heap is very popular data structure used for solving Top X types of problem.
For eg find the top 10 popular items by sales volume, top X users by activity etc.
PriorityQueue data structure of java is based on heap and can help in answering any top X type of query. PriorityQueue is not thread safe, so it can't be used in highly concurrent environment without adding lock.
Underlying data structure of heap is array and elements are shifted up and down to maintain the element order, for each swim operation the full array should be locked down to avoid race condition.
Even read options like poll are mutating operation due to which it is hard to share Heap with multiple threads.
Underlying algorithm makes is very hard to use heap in concurrent or parallel environment.
Heap - Source: Wikipedia |
Lets look at other options to achieve heap like functionality without giving up on concurrency.
Concurrent heap data structure need following properties
- Highly concurrent ordered collection.
- Parallel writes/read support.
- Top X type of API.
- Multiple top operations supported concurrently using same instance of data structure.
Concurrent Skip list from JDK looks good candidate for this but we need to add some missing functionality.
Lets recap how Skip List data structure looks.
SkipList - Source:Wikipedia |
SkipList is ordered multiple link list, it has got some fast lanes and slow lanes. Fast lanes allow to find element in approx log(n) cost.
SalesItem is Comparable and it compares items by sales volume.
class SalesItem implements Comparable<SalesItem> {
private final String product;
private final long sales;
@Override
public int compareTo(SalesItem o) {
return Long.compare(sales, o.sales);
}
}
We can't add SalesItem in SkipList because items having same sales volume will be rejected.
We can add another wrapper class that adds extra metadata to handle this problem. It will look something like this
class Item implements Comparable<Item> {
private final T value;
private final long index;
@Override
public int compareTo(Item o) {
int r = this.value.compareTo(o.value);
r = heapType.equals(HeapType.Max) ? -r : r;
if (r != 0) {
return r;
}
return Long.compare(index, o.index);
}
index is that extra metadata that is added to handle items with same sales volume and it case of conflict it will order by index
- TopX API
For TopX API streams.limit can be used, another benifit of using streams APIs is that client application can use other cool features of Streams API.
Full Code for Concurrent Heap
public class ConcurrentHeap<T extends Comparable> {
private final AtomicLong id = new AtomicLong();
private final NavigableSet<Item> data = new ConcurrentSkipListSet<>();
private final HeapType heapType;
public void add(T value) {
data.add(new Item(value, id.incrementAndGet()));
}
public Stream<T> stream() {
return data
.stream()
.map(v -> v.value);
}
public Stream<T> top(int x) {
return stream().limit(x);
}
class Item implements Comparable<Item> {
private final T value;
private final long index;
@Override
public int compareTo(ConcurrentHeap<T>.Item o) {
int r = this.value.compareTo(o.value);
r = heapType.equals(HeapType.Max) ? -r : r;
if (r != 0) {
return r;
}
return Long.compare(index, o.index);
}
Item(T value, long index) {
this.value = value;
this.index = index;
}
}
Underlying data structure that is behaving like Heap is NavigableSet, JDK has 2 implementation if this first one is TreeSet and another one is ConcurrentListSkipSet.
We can choose between TreeSet/ConcurrentListSkipSet based on need to avoid the cost of concurrency in single thread env.
Full working code for this blog post is available @ github