Tuesday, 15 March 2022

Concurrent Heap data structure

Lets Heapify!!!




Heap is very popular data structure used for solving Top X types of problem.

For eg find the top 10 popular items by sales volume, top X users by activity etc.

PriorityQueue data structure of java is based on heap and can help in answering any top X type of query. PriorityQueue is not thread safe, so it can't be used in highly concurrent environment without adding lock.

Underlying data structure of heap is array and elements are shifted up and down to maintain the element order, for each swim operation the full array should be locked down to avoid race condition. 

Even read options like poll are mutating operation due to which it is hard to share Heap with multiple threads.

Underlying algorithm makes is very hard to use heap in concurrent or parallel environment.  

Heap - Source: Wikipedia

Lets look at other options to achieve heap like functionality without giving up on concurrency.

Concurrent heap data structure need following properties

  • Highly concurrent ordered collection. 
  • Parallel writes/read support.
  • Top X type of API.
  • Multiple top operations supported concurrently using same instance of data structure.

Concurrent Skip list from JDK looks good candidate for this but we need to add some missing functionality.

Lets recap how Skip List data structure looks.


SkipList - Source:Wikipedia

SkipList is ordered multiple link list, it has got some fast lanes and slow lanes. Fast lanes allow to find element in approx log(n) cost.


- Unique Item identification 

JDK has set and map implementation of Skiplist. Map/Set can only have unique keys, we need to find a way to tweak unique key requirement to make set behave like Heap. 

We can use little trick, every value that is added to SkipList will have additional metadata that can running sequence number or timestamp, this extra metadata can be used for resolving conflict when 2 items are equal based on comparison.

Lets take product by sales use case for code samples. 

SalesItem is Comparable and it compares items by sales volume. 

class SalesItem implements Comparable<SalesItem> {

private final String product;
private final long sales;

@Override
public int compareTo(SalesItem o) {
return Long.compare(sales, o.sales);
}
}

We can't add SalesItem in SkipList because items having same sales volume will be rejected.

We can add another wrapper class that adds extra metadata to handle this problem. It will look something like this


class Item implements Comparable<Item> {
private final T value;
private final long index;

@Override
public int compareTo(Item o) {

int r = this.value.compareTo(o.value);
r = heapType.equals(HeapType.Max) ? -r : r;
if (r != 0) {
return r;
}
return Long.compare(index, o.index);
}

index is that extra metadata that is added to handle items with same sales volume and it case of conflict it will order by index

 - TopX API

For TopX API streams.limit can be used, another benifit of using streams APIs is that client application can use other cool features of Streams API.

Full Code for Concurrent Heap

public class ConcurrentHeap<T extends Comparable> {

private final AtomicLong id = new AtomicLong();
private final NavigableSet<Item> data = new ConcurrentSkipListSet<>();
private final HeapType heapType;

public void add(T value) {
data.add(new Item(value, id.incrementAndGet()));
}

public Stream<T> stream() {
return data
.stream()
.map(v -> v.value);
}

public Stream<T> top(int x) {
return stream().limit(x);
}

class Item implements Comparable<Item> {
private final T value;
private final long index;

@Override
public int compareTo(ConcurrentHeap<T>.Item o) {

int r = this.value.compareTo(o.value);
r = heapType.equals(HeapType.Max) ? -r : r;
if (r != 0) {
return r;
}
return Long.compare(index, o.index);
}

Item(T value, long index) {
this.value = value;
this.index = index;
}

}


Underlying data structure that is behaving like Heap is NavigableSet, JDK has 2 implementation if this first one is TreeSet and another one is ConcurrentListSkipSet. 

We can choose between TreeSet/ConcurrentListSkipSet based on need to avoid the cost of concurrency in single thread env.

 

Full working code for this blog post is available @ github 

Friday, 7 May 2021

What is Artificial Intelligence ?

This is a post from series on artificial intelligence and machine

In this post, we will try to understand what AI is and where machine learning fits in it.




As per Wikipedia Artificial Inteligence is 

Simulating any intellectual task.

It can be also seen as the industrial revolution to simulate the brain.

AI is a very broad field and it contains many subfields and it is important to understand what the full landscape looks like and focus on the core part that overlaps with almost every subfield of AI.




Let's try to understand each subfield.

Knowledge representation

This is core to many AI applications, it is based on an expert system that collects explicit knowledge that is available in some database or possessed by experts.
This can be also seen as Knowledge about knowledge. We interact with system type of system every day be it Amazon Alexa, Apple Siri, or Google Assistance.
 

Perception

Machine Perception is about using sensor input to understand context and action to take. Nowadays we are surrounded by cameras, microphones, IoT devices, etc.

Some real-world applications include facial recognition, computer vision, speech, etc.

Motion and manipulation

This is one of the heavy use of AI, it includes robotics. The industrial revolution has already helped the world economy grow, and robotics will take it to the next level. Some applications in industrial/domestic robots. In the time of pandemics like Covid, robotics is even going to help more as everyone is concerned about safety. Autonomous vehicles are one of the important applications of this sub-field. 

Natural language processing

NLP allows the machine to read and understand human language. It includes processing huge unstructured data and derives meaning from it. Some of the application that we get interact every day is search autocomplete, auto-correction, language translator, chatbots, targeted advertisement, etc.

Search and planning

This area covers machine that is set a goal and achieves it. The machine builds the state of the world and can make predication on how their action will change it. 

Learning

This is also called as Machine Learning and it is the study of computer algorithms that automatically improve through experience. 
It sounds like how humans learn something!

It is a subfield of AI but the most important one as it is applied to all the subfields of AI, knowing this is a must before starting on any other subfield of AI.






Let's explore more on the Learning part now.

What is machine learning? 


 

One of the quick definitions of machine learning is pattern recognization, it can also be seen as how computers can discover to solve problems without explicit programming. 

Machine learning is made up of 3 steps.



The step of updating the model via learning is where real machine learning happens. 


Data science is related to machine learning but is often seen as only machine learning.  AI & data science good overlap with machine learning, it can be seen as below.



Where does data science fits in machine learning? It is the unified concept of statistics, maths, data mining, data analysis, etc


Data Science


Now with a high-level understanding of AI, ML & data science, we are ready to do deep dive in ML.






Artificial Intelligence and machine learning

This post contains a catalog of high-level concepts in AI & ML.




I will keep on updating as I write more stuff

Saturday, 10 April 2021

Timeless investing lesson

I usually don't share my thoughts about investment via the blog but thought it would be useful to record the most important investing lesson so that I can come back and refer to it.



The last few weeks have been interesting in the stocks investment world, some of the recent events reinforce timeless investment advice that people only learn by making mistakes.


In this post, I will share 2 such pieces of advice with very recent examples. 





Never trade on leverage

This is the number 1 reason why people lose money and also differentiate gambler(i.e trade) vs investor.


Archegos hedge fund took down 4 major investment banks on March 2021.


Let's try to understand leverage before we get into how Archegos crash and burned.


Meaning of leverage

"use borrowed capital for (an investment), expecting the profits made to be greater than the interest payable."


The first exposure of leverage everyone gets exposed to is via housing loans, banks will fund part of the house price and the borrower will pay interest over the amount that is borrowed.


I would say leverage via house loan is good leverage because it allows to get shelter overhead and also has a good chance of price appreciation. 

One of the most important things about a house loan is that tenor of payment is fixed and not linked to the underlying asset value.  


If house price falls by 50% then the borrower doesn't have to pay more to a bank but he can gain if the price goes up by 50%. I am not saying a house loan is a good leverage but it is on the fence type of thing. 


Lets see how leverage works in the investment world. 


In trading/investment world leverage is called a margin, the trading broker will allow x times of leverage to clients but with conditions that whenever they issue a margin call then the investor has to deposit more money or sell some securities.

 


Another example to understand this, assume we have 1 Million and we get 10X leverage, this means that we can buy securities worth of 10 million. 






Only 2 things can happen and each has the probability of 50-50 in short term.


The bet goes your way


This is the happy scenario where the market value of securities goes up by 20% and you are happy and also pay back some of the leverage and reduce risk.


The market is cruel to you


This is what happens in most of the scenarios and you loose 20%, in this scenario, you lose your 1 Million and an additional 1 Million since you will not have the cash to fulfill the margin call you will start selling securities or take more leverage. In whatever options are selected it will cause panic in the market and will cause more selloff. 


Now, this is exactly what happened with Archegos hedge fund because a bet on one of the Chinese company did not go their way. 



Archegos hedge fund is a family-run business and the owner has not so good reputation, they did few things very extreme to fail big.


- Took leverage of around 500 times.

- Used Contract for difference, which are very high leverage instruments and it is ban in many markets. Many individual investors are not allowed to trade in CFD by regulators. 



Now how does anyone gets 500 times leverage? 

They used a couple of brokers to get leverage and these are big names like Goldman Sachs, Morgon Stanley, Deutsche Bank, Credit Suisse, Nomura Holding.


None of these brokers were aware that Archegos is operating on such high margin and also to add that risk management process of these banks were not to the mark to issue some warning signal before it was too late.


Look at the stock prices of these brokers when this thing came out.




Goldman Sachs & morgan stanley was little smart enough to recover their loss but Credit Suisse and Nomura were caught off guard.


Billions of dollars were wiped up, some of the numbers that are coming in news are 10 Billion but many experts feel that it could be 100 Billion, many of these banks have already declared that they are going to report losses in the next quarter and will also result in some people losing jobs.


As I write this post, the stock price has dropped more :-(


This is not the only example where a big leverage bet has gone wrong, the 2008 subprime crisis was also due to leverage and the common man was directly impacted by that.


Keep eye on this news to understand the real impact.


As an individual investor never trade on leverage. 


Markets are efficient 


There are 2 popular styles of investing growth and value. Warren Buffett is a value investor and many companies and people try to follow him.

It is hard to achieve anything close to Mr Warren because to become Warren you need the temperament and patience of warren. 

Many investors try to pick stocks using value investing techniques and when they fail then they try to pick a fund manager that can do value investing for them.

Trust me that picking stock or fund management is like flipping a coin and the downside probability is very high. 



On March 10, 2021, International Value Advisers ( IVA) decided to liquidate the fund.





IVA was an esteem value shop and it has a sad and common end of actively managed funds.


IVA was sitting on cash for a very long time because they thought the market is expensive, there was a time when the fund had 50% cash waiting to be deployed.


They got asset allocation wrong and waited for the timing market

The efficient market hypothesis states that share prices reflect all information and consistent alpha generation is impossible.


Every now and then someone will come and tell the market is inefficient and will try to fight against it.


This has been proved multiple times and a nice article was posted on Forbes about it, it is called any monkey can beat the market.


IVA investors would have made lots of money by just investing in a broad market index fund. 

Morningstar has done a nice analysis of the IVA fund, read this to understand in detail what went wrong. 



If you can't beat the market then be the market, Index fund should be the core strategy. 





I will leave you with one more interesting read about Warren Buffett Just Won a 10-Year Million-Dollar Bet, where he challenges hedge fund managers to beat the market and they end up losing and have to close the fund.







I will wrap up now but if you want to remember one thing then it has to be "never trade on leverage"


Sunday, 4 April 2021

Data partitioning approach.

Partition is the technique of breaking a large data set into small manageable chunks.

It is important to define partition strategy such that a single piece of information belongs to a single partition only, good partition strategy enables data level parallelism.

Partition is not a new idea; it comes from the RDBMS world where it is used for creating partition indexes.

Atomicity between partitions, especially for writing, is hard to achieve and it requires some tradeoff.

Partition should not be confused with replication, replication is keeping multiple copies of a single partition for high availability or fault tolerance. 

Partition + Replication makes any data system fully distributed. 

In this post we will look at different approaches to data partitioning. Lets pick email address dataset as a sample data for partition example.

Let's take bill.gates@microsoft.com email as an example.

This email can be partitioned by hash(bill.gates@microsoft.com) or by range(bill.gates@microsoft.com) based on how it is queried. 

Hash

Hash one is simplest to understand but when we are in a distributed environment then it is not that simple because nodes that are storing data can go offline or new nodes are added to manage load. Due to dynamic node numbers we don't have a constant denominator to find mod for key.

It is important to understand why hash table type of data structure can't be used for data partitioning in distributed systems.

Classic hash table starts with N capacity and based on load factor(i.e how much it is filled) it will resize and all the keys need to rehash based on new capacity.
This resize is an expensive operation and also during resize the hash table might not be available for some operations.

Resizing a distributed dataset will mean shuffling all the keys and when a dataset contains billions or trillion of keys then it is going to generate a huge load on the system and also making it unavailable for end users.

If we look in terms of BIG O then classic hash table complexity will look something like below.



It is very clear that the Add/Remove node option is going to bring everything to standstill. Some distributed systems like Kafka have fixed no of partition strategies to avoid exactly this problem. 

So what can be done in this case ?
Key insight is using a range of hash codes to represent keys and it helps in reducing overhead of key migration when a new node is added or removed.


Sets up nicely for next section

Consistent Hashing

Consistent hashing is now a new idea but it is more than a 2 decade old idea that was mentioned in paper.

This is a special hash function which changes minimally as new nodes are added/removed. In this approach both keys and slots/nodes are hashed using the same function and enables mapping nodes to intervals of keys.

All the nodes are placed in a ring and each node is responsible for range of hash. Due to this range hash property whenever a node is added or removed k/n keys need to be replaced where k is total number of keys and n is number of nodes or slots.




Each node is responsible for a range of keys due to which when a node is added or removed only part of keys needs to be replaced. Let's look at example 

Node removed




In this example node B is lost and node A takes up responsibility and now handles range ( 1 to 200). Only keys between 101 to 200 need to move and this is much better than a naïve hash table.

Node Added


New node C.1 is added and it shares responsibility by handling part of keys from C.

Few things are done to balance load

 Rebalancing by adjusting ranges.

Under this approach the background process is checking load(i.e. no of keys) on nodes. Any node that is overloaded or underloaded is balanced. 


Add virtual nodes.

One of the problem with consistent hash is that some nodes become overload and to manage that it uses concept of virtual nodes, which are replica of nodes. This adds more nodes/slots and improves probability of better load balancing. 

"Virtual Node" also gives one nice option to manage nodes with different capacity, if cluster contains node that is 2X bigger than other nodes then it can have more replica compared to others.


 Lets look at one simple implementation. Code contains 2 components one is ConsistentHash and Distributed hash using consistent hash.

public class ConsistentHashing<T> {

private final Function<byte[], Integer> hashFunction;
private final int replica;
private final SortedMap<Integer, T> ring = new TreeMap<>();
private final Function<T, String> nodeKey;


public void add(T node) {
for (int c = 0; c < replica; c++) {
String key = String.format("%s_%s", nodeKey.apply(node), c);
ring.put(hashFunction.apply(key.getBytes()), node);
}
}

public T findSlot(Object key) {
int hash = hashFunction.apply(key.toString().getBytes());
return ring.getOrDefault(hash, findClosestSlot(hash));
}

private T findClosestSlot(int hash) {
SortedMap<Integer, T> tail = ring.tailMap(hash);
int keyHash = tail.isEmpty() ? ring.firstKey() : tail.firstKey();
return ring.get(keyHash);
}

}

Distributed Hash

public class DistributedHashTable {
private final ConsistentHashing<Node> hash;

public DistributedHashTable(ConsistentHashing<Node> hash) {
this.hash = hash;
}

public void put(Object key, Object value) {
findSlot(key).put(key, value);
}

private Node findSlot(Object key) {
return hash.findSlot(key);
}

public Object get(Object key) {
return findSlot(key).get(key);
}
}

Consistent is very popular in many distributed systems like Casandra, DynamoDB,CouchBase, Akka Router, Riak and many more.

Rendezvous Hashing

Rendezvous hashing is alternative to consistent hashing and its more general purpose. This also came at the same time but did not become that popular.

Idea is based on a score that is computed by using node & key, and the node with the highest score is picked up for handling requests. Hash function looks something like hash(node-x,key)




Few things that make this algorithm different

Low overhead

Picking the server is low overhead because it needs very less computation and no coordination is required.

No coordination

This property of the algorithm makes it very unique because all the nodes that are trying to identify a server for request processing will select the same server without communication with other nodes in cluster. Think like no global lock is required to make decision.

Heterogenous Servers

One of the problems with consistent hashing is that some of the servers will be overloaded and to manage that additional replicas/virtual nodes are added and that result in a long list of servers to select from. This also becomes a problem when a cluster contains a server with different compute/storage capacity because now generic replica factor (i.e 3) can't be used.

In this algorithm virtual servers are optional and only required if servers of different capacity are added in node.

Better load balancing

Hash code has random property due to which each server has equal probability of getting picked.

No duplicate work

For eg client request is going to broadcast 10Gb data and doing this multiple times will definitely choke network bandwidth.
This algorithm gives guarantee that each request will only map to a single server with no/less coordination, so duplicate work can be totally avoided and it will result in saving extra network overhead.

Minimal Disruption

This property makes it very good selection especially when cluster is very dynamic. Incase of node failure only keys mapped to failed server needs to redistributed and it becomes very significant overhead when data set is huge.

Code snippet for node selection/assignment

public N assignNode(Object key) {

long value = Long.MIN_VALUE;
N node = null;
for (N n : nodes.values()) {
long newValue = hash.apply(n, key);
if (newValue > value) {
node = n;
value = newValue;
}
}
return node;
}

Apache Ignite distributed database is based on Rendezvous hashing. Some other applications are cache, monitoring applications & exclusive task processing.  


Range

This is one of the simple and yet very powerful way for data partition and relational database has used it for ages.

Idea of range partition comes from how books are arrange in library or the ways words are put in dictionary. One of the important assumption to take advantage of this style is ordering of keys, if keys are random then this scheme is not good fit. 




Range implementations can be vary based on need, lets look at some of options.

Static Range

As name suggest , it is static range and never changes over lifecycle of service. Static range works well if data is distributed evenly. Some of the good examples are time series data that can benefit by static ranges where time period ( i.e day/hour/minute) is one bucket. Some implementations adds little bit of dynamic nature to this style by creating new partition based on range pattern like every hour new partition is created.

Real data is skewed, so static range runs in problem when one of the partition becomes hot partition and whole system performance degrades and also not all the resources are utilized properly.

Dynamic Range

Some of the new data systems add this innovation to make the system scalable. Let's look at how it works.

It starts with an empty container with some capacity, in this example let's use 5.
Once the container is full then it splits in half, this process keeps on going. This way of dynamic partition creation handles skewed data and also a very scalable approach. 





In the above example adding 9 keys has created 3 nodes, each node containing an equal number of values. At the end partition will looking something like below



Just to highlight again that ordering property is maintained during split process and makes it very good fit for efficient range scan.


Ordered data has many other benefits that can easily utilized query optimizer.

Any discussion on partition can't be completed without talking about request routing, answer to this question depends on the complexity that you want to push to clients.



In this design router has all the details of keys placement and it can route traffic.

Conclusion

Data partition is key for building a distributed system. It has many other challenges like how to rebalance partitions without any downtime, how to process query that needs access(i.e. read and write) to multiple partitions.

Partition replication comes after partition and it is also equally challenging.