Showing posts with label Distributed Database. Show all posts
Showing posts with label Distributed Database. Show all posts

Sunday, 4 April 2021

Data partitioning approach.

Partition is the technique of breaking a large data set into small manageable chunks.

It is important to define partition strategy such that a single piece of information belongs to a single partition only, good partition strategy enables data level parallelism.

Partition is not a new idea; it comes from the RDBMS world where it is used for creating partition indexes.

Atomicity between partitions, especially for writing, is hard to achieve and it requires some tradeoff.

Partition should not be confused with replication, replication is keeping multiple copies of a single partition for high availability or fault tolerance. 

Partition + Replication makes any data system fully distributed. 

In this post we will look at different approaches to data partitioning. Lets pick email address dataset as a sample data for partition example.

Let's take bill.gates@microsoft.com email as an example.

This email can be partitioned by hash(bill.gates@microsoft.com) or by range(bill.gates@microsoft.com) based on how it is queried. 

Hash

Hash one is simplest to understand but when we are in a distributed environment then it is not that simple because nodes that are storing data can go offline or new nodes are added to manage load. Due to dynamic node numbers we don't have a constant denominator to find mod for key.

It is important to understand why hash table type of data structure can't be used for data partitioning in distributed systems.

Classic hash table starts with N capacity and based on load factor(i.e how much it is filled) it will resize and all the keys need to rehash based on new capacity.
This resize is an expensive operation and also during resize the hash table might not be available for some operations.

Resizing a distributed dataset will mean shuffling all the keys and when a dataset contains billions or trillion of keys then it is going to generate a huge load on the system and also making it unavailable for end users.

If we look in terms of BIG O then classic hash table complexity will look something like below.



It is very clear that the Add/Remove node option is going to bring everything to standstill. Some distributed systems like Kafka have fixed no of partition strategies to avoid exactly this problem. 

So what can be done in this case ?
Key insight is using a range of hash codes to represent keys and it helps in reducing overhead of key migration when a new node is added or removed.


Sets up nicely for next section

Consistent Hashing

Consistent hashing is now a new idea but it is more than a 2 decade old idea that was mentioned in paper.

This is a special hash function which changes minimally as new nodes are added/removed. In this approach both keys and slots/nodes are hashed using the same function and enables mapping nodes to intervals of keys.

All the nodes are placed in a ring and each node is responsible for range of hash. Due to this range hash property whenever a node is added or removed k/n keys need to be replaced where k is total number of keys and n is number of nodes or slots.




Each node is responsible for a range of keys due to which when a node is added or removed only part of keys needs to be replaced. Let's look at example 

Node removed




In this example node B is lost and node A takes up responsibility and now handles range ( 1 to 200). Only keys between 101 to 200 need to move and this is much better than a naïve hash table.

Node Added


New node C.1 is added and it shares responsibility by handling part of keys from C.

Few things are done to balance load

 Rebalancing by adjusting ranges.

Under this approach the background process is checking load(i.e. no of keys) on nodes. Any node that is overloaded or underloaded is balanced. 


Add virtual nodes.

One of the problem with consistent hash is that some nodes become overload and to manage that it uses concept of virtual nodes, which are replica of nodes. This adds more nodes/slots and improves probability of better load balancing. 

"Virtual Node" also gives one nice option to manage nodes with different capacity, if cluster contains node that is 2X bigger than other nodes then it can have more replica compared to others.


 Lets look at one simple implementation. Code contains 2 components one is ConsistentHash and Distributed hash using consistent hash.

public class ConsistentHashing<T> {

private final Function<byte[], Integer> hashFunction;
private final int replica;
private final SortedMap<Integer, T> ring = new TreeMap<>();
private final Function<T, String> nodeKey;


public void add(T node) {
for (int c = 0; c < replica; c++) {
String key = String.format("%s_%s", nodeKey.apply(node), c);
ring.put(hashFunction.apply(key.getBytes()), node);
}
}

public T findSlot(Object key) {
int hash = hashFunction.apply(key.toString().getBytes());
return ring.getOrDefault(hash, findClosestSlot(hash));
}

private T findClosestSlot(int hash) {
SortedMap<Integer, T> tail = ring.tailMap(hash);
int keyHash = tail.isEmpty() ? ring.firstKey() : tail.firstKey();
return ring.get(keyHash);
}

}

Distributed Hash

public class DistributedHashTable {
private final ConsistentHashing<Node> hash;

public DistributedHashTable(ConsistentHashing<Node> hash) {
this.hash = hash;
}

public void put(Object key, Object value) {
findSlot(key).put(key, value);
}

private Node findSlot(Object key) {
return hash.findSlot(key);
}

public Object get(Object key) {
return findSlot(key).get(key);
}
}

Consistent is very popular in many distributed systems like Casandra, DynamoDB,CouchBase, Akka Router, Riak and many more.

Rendezvous Hashing

Rendezvous hashing is alternative to consistent hashing and its more general purpose. This also came at the same time but did not become that popular.

Idea is based on a score that is computed by using node & key, and the node with the highest score is picked up for handling requests. Hash function looks something like hash(node-x,key)




Few things that make this algorithm different

Low overhead

Picking the server is low overhead because it needs very less computation and no coordination is required.

No coordination

This property of the algorithm makes it very unique because all the nodes that are trying to identify a server for request processing will select the same server without communication with other nodes in cluster. Think like no global lock is required to make decision.

Heterogenous Servers

One of the problems with consistent hashing is that some of the servers will be overloaded and to manage that additional replicas/virtual nodes are added and that result in a long list of servers to select from. This also becomes a problem when a cluster contains a server with different compute/storage capacity because now generic replica factor (i.e 3) can't be used.

In this algorithm virtual servers are optional and only required if servers of different capacity are added in node.

Better load balancing

Hash code has random property due to which each server has equal probability of getting picked.

No duplicate work

For eg client request is going to broadcast 10Gb data and doing this multiple times will definitely choke network bandwidth.
This algorithm gives guarantee that each request will only map to a single server with no/less coordination, so duplicate work can be totally avoided and it will result in saving extra network overhead.

Minimal Disruption

This property makes it very good selection especially when cluster is very dynamic. Incase of node failure only keys mapped to failed server needs to redistributed and it becomes very significant overhead when data set is huge.

Code snippet for node selection/assignment

public N assignNode(Object key) {

long value = Long.MIN_VALUE;
N node = null;
for (N n : nodes.values()) {
long newValue = hash.apply(n, key);
if (newValue > value) {
node = n;
value = newValue;
}
}
return node;
}

Apache Ignite distributed database is based on Rendezvous hashing. Some other applications are cache, monitoring applications & exclusive task processing.  


Range

This is one of the simple and yet very powerful way for data partition and relational database has used it for ages.

Idea of range partition comes from how books are arrange in library or the ways words are put in dictionary. One of the important assumption to take advantage of this style is ordering of keys, if keys are random then this scheme is not good fit. 




Range implementations can be vary based on need, lets look at some of options.

Static Range

As name suggest , it is static range and never changes over lifecycle of service. Static range works well if data is distributed evenly. Some of the good examples are time series data that can benefit by static ranges where time period ( i.e day/hour/minute) is one bucket. Some implementations adds little bit of dynamic nature to this style by creating new partition based on range pattern like every hour new partition is created.

Real data is skewed, so static range runs in problem when one of the partition becomes hot partition and whole system performance degrades and also not all the resources are utilized properly.

Dynamic Range

Some of the new data systems add this innovation to make the system scalable. Let's look at how it works.

It starts with an empty container with some capacity, in this example let's use 5.
Once the container is full then it splits in half, this process keeps on going. This way of dynamic partition creation handles skewed data and also a very scalable approach. 





In the above example adding 9 keys has created 3 nodes, each node containing an equal number of values. At the end partition will looking something like below



Just to highlight again that ordering property is maintained during split process and makes it very good fit for efficient range scan.


Ordered data has many other benefits that can easily utilized query optimizer.

Any discussion on partition can't be completed without talking about request routing, answer to this question depends on the complexity that you want to push to clients.



In this design router has all the details of keys placement and it can route traffic.

Conclusion

Data partition is key for building a distributed system. It has many other challenges like how to rebalance partitions without any downtime, how to process query that needs access(i.e. read and write) to multiple partitions.

Partition replication comes after partition and it is also equally challenging.

Tuesday, 8 December 2020

Disk storage algorithm

 This is follow up post on rethinking-key-value-store article to explore storage part of system.



Many data systems support plugin based storage layers and it opens a whole set of options to use one from a shelf or build one that suits your needs.

In this post i will share how a new storage system can be built and later it is used for building time series application on top of it.

Before we go deep in disk based algorithm, let's look at why non-volatile storage is required.

In today times when machines with Terabytes RAM are available why do we have to bother to store stuff on disk ? 

Couple of good reasons why still having good non-volatile storage manager makes sense today.

  • It is cheap

Disk is very cheap as compared RAM, so it does not make sense to store data in expensive store especially when data is not being used frequently. Lots of cloud provider bill can saved! 

  • It is unlimited

Although a machine with big RAM is available but it is still limited , it will not continue get bigger at the same rate as in the past and if we want applications to have the illusion that it has unlimited memory then flushing to disk is a must.  

  • Allow fast restarts
Think what will happen if application crash ? Application has to rebuild the whole state and it could take very long time before application is available again to serve request. Saving computed data to disk and restoring from it will be way faster.

  • Information exchange 

How do two application running on different machine can communicate ? For inter application communication in-memory data has to written in wire format so that it can be sent over network.

and many more reasons..

Application has volatile & non-volatile area and storage manager sits in middle of that (.ie RAM and Disk) and provide efficient access to data.





RAM and DISK are very different types of hardware and access patterns are also very different.

On one hand RAM can be accessed randomly and it is fast for both read/write.

Disks are accessed sequentially using blocks and very slow write and slow read, SSD has improved the access time but sequential access is the recommended to get best performance.

Storage managers have to use efficient data structure on disk to get best performance, another thing is that disk has nothing like malloc to manage allocation. Everything is bare metal and the application developer has to manage allocation, garbage collector, locks etc.

Disk read/write access is based on a block which is usually 4 KB, but memory read/write is based on a cacheline which is 64 Bytes, just this difference in read/write size requires new ways of organizing data to get the best out of the device.  

All the above problems make writing disk based algorithms very challenging.

Lets look at some options of storing data on disk.

Generally file on disk looks some thing like below, each block is of fix sized and it depends on hardware, most of the vendors use 4 KB blocks. IO device provide atomic read/write guarantee at block level. 



Page Layout

Lets unpack disk block to explore options to store data.

Fixed Size

Fixed size data is very common and intuitive way to store data in block provided underlying data is like that and mostly applicable for number variants data type like ( byte, short, int, long , float & double). It is possible to make it work for varchar but padding is required to achieve this. If underlying data can be mapped to fixed size value then this is best option.


Fixed size has good random access property, just by doing simple multiplication specific record can be found for eg to find 3rd record we will use record * sizeof(record) (i.e 3 * 4) to find the offset of data and read it. 
Most of the application has variable record size due to which more flexible storage layout is required. 


Size Prefix

In this approach every record is prefixed with 4 Byte size and followed with data.
This has overhead of extra 4 Bytes and sometime this can be more than actual data and other thing that is not good is that it is sequential access, if last record is required then it requires to scan full page.
One more downsize is what happens when records are updated ? this will cause overflow or fragmentation. 

%3CmxGraphModel%3E%3Croot%3E%3CmxCell%20id%3D%220%22%2F%3E%3CmxCell%20id%3D%221%22%20parent%3D%220%22%2F%3E%3CmxCell%20id%3D%222%22%20value%3D%22%22%20style%3D%22rounded%3D1%3BwhiteSpace%3Dwrap%3Bhtml%3D1%3BfillColor%3D%23fff2cc%3BstrokeColor%3D%23d6b656%3B%22%20vertex%3D%221%22%20parent%3D%221%22%3E%3CmxGeometry%20x%3D%22890%22%20y%3D%22385%22%20width%3D%22190%22%20height%3D%22130%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3C%2Froot%3E%3C%2FmxGraphModel%3E



This is good for queue based system where write is always at the end and read is also large sequential scan. 

Slotted Page

This approach is hybrid one and takes advantage of both fixed and size prefix page.





Slotted page is transformation of Size Prefix page to co-locating related data, for eg all data is together and all size is together.

Single page contains 2 Region
  • Header Region
This section contains some metadata about the page that include version, page id , hash, number of records , compression flag etc. 
 
  • Data Region
Data section is subdivided in data segment & index segment. Index segment is also called as Slot Array and it can be 4 byte or 2 byte fixed size value, it contains pointer to data segment.
Data Segment is written from left and Slot Array is written from right side. Page is considered full once no space is available for either data segment or Slot.

This approach gives random access to records by using Slot array, every record can be addressed by (PageId, Record Id). Full file content can be seen as a 2 dimensional array.


Slotted page is a very popular layout for many databases. This also allows to build sparse or dense indexes based on page & slot.


Disk Data Structure 

Now we will explore how smallest unit of storage (i.e. page) can be taken to build some data structures on top of it and finally some application using disk based data structure.

Remember disk has no malloc API, so we have to build something like pagealloc that will enable dynamic allocation of blocks/page.


Page Allocator interface is an low level API and API looks something like below.

public interface PageAllocator {
WritePage newPage();

long commit(WritePage page);

ReadPage readByPageId(int pageId);

int noOfPages();

int pageSize();

byte version();

List<PageInfo> pages();

ReadPage readByPageOffset(long offSet);

}

Application - Time Series Database

Using Page allocator abstraction we will build Time series database that will use Sorted String table as underlying store.

SSTable stores immutable rows that are ordered by some key in files. SSTable is basis for Log structured merge tree that is alternative to B+Tree.

Log structured merge tree powers many popular data stores like BigtableHBaseLevelDB RocksDBWiredTiger, CassandraInfluxDB and many more.


SSTable

SSTable is made of couple of ordered memory maps & ordered rows on disk. Storage manage sits right in middle to manage sorted structures on disk & memory.




Writers  

All the write requests are handled by writing to In-Memory ordered map and once those maps are full then get converted to read only In-Memory maps and periodically flushed to disk for durability.  

Writing to such a system is very fast because it is done using in-memory data structure. 

Readers

Readers is where this gets more interesting because now read has to hit multiple data structures to find records. First it scans mutable map, then immutable maps and finally on the disk.
Rather than doing a single seek it has to do multiple seeks but since all the data structure be on memory or disk is sorted, so requests can be processed in LOG N time.

Over a period of time a number of files can grow, so a compaction process is required that will merge multiple sorted files and create a small number of files. This compaction process is what makes SSTable as Log structured merge tree.

Some code !
To have some thing working i used ConcurrentSkipListMap from JDK to create In-Memory ordered map and use PageAllocator to flush ordered map to disk.

SortedStringTable

public interface SortedStringTable<V> {

void append(String key, V value);

void iterate(String from, String to, Function<V, Boolean> consumer);

// API for saving SST table for persistence storage
Collection<PageRecord<V>> buffers();

void remove(int pageId);

void flush();
}
Working SSTable code can be found @ sst github.


First data structure is ready for our time series database :-)

Time Series 

Time series application will be built on top of SSTable.


Timeseries interface is simple, it looks something like below.

public interface TimeSeriesStore {

<T> EventInfo insert(T row);

void gt(LocalDateTime fromTime, Function<EventInfo, Boolean> consumer);

void lt(LocalDateTime toTime, Function<EventInfo, Boolean> consumer);

void between(LocalDateTime startTime, LocalDateTime endTime, Function<EventInfo, Boolean> consumer);

void flush();


}
Time series application code can be found @ timeseries repo.

To experiment with some some real time series data, i picked up sample data from Jan Yellow Taxi Trip and loaded in the app. yellow_tripdata_2020-01 has 6+ Million records.

Sample time series application using this data can be found @ NYTaxiRides.java

All the code has good unit test coverage, so feel free to hack and learn.

Conclusion

Disk based algorithm are very cool and understanding it gives good idea about how modern data systems work. You might not build data system from scratch but knowing these algorithm will definitely help in deciding which data system to pick based on trade off.

Monday, 16 November 2020

Rethinking key value store

In this post i will share how key-value stores can be used for building different types of application, but before we start deep dive lets do quick recap of database systems.

Database technology is made of 2 important components ( Storage + Query). Such type of architecture allow to replace part of system or only build storage or query part of it and reuse other from open source.




Many modern database are taking advantage of plugin architecture and focusing on only one part of system, for example storage could be again broken down in various format based on access pattern for reads/write


 

Each storage format ( hash,b-tree,lsm or log) is optimized for specific read/write access pattern. Many database uses multiples format also.

Once we come to storage format then Row vs Column layout also comes in play, so in nutshell it is hard to build storage system, so it makes sense to use off the self solution rather than building something from ground up.

Query area is also huge as this include different types of API, Query engine, Optimizer , transactions etc



%3CmxGraphModel%3E%3Croot%3E%3CmxCell%20id%3D%220%22%2F%3E%3CmxCell%20id%3D%221%22%20parent%3D%220%22%2F%3E%3CmxCell%20id%3D%222%22%20style%3D%22edgeStyle%3Dnone%3Brounded%3D1%3BorthogonalLoop%3D1%3BjettySize%3Dauto%3Bhtml%3D1%3BentryX%3D0.5%3BentryY%3D0%3BentryDx%3D0%3BentryDy%3D0%3B%22%20edge%3D%221%22%20source%3D%224%22%20target%3D%228%22%20parent%3D%221%22%3E%3CmxGeometry%20relative%3D%221%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3CmxCell%20id%3D%223%22%20style%3D%22edgeStyle%3Dnone%3Brounded%3D1%3BorthogonalLoop%3D1%3BjettySize%3Dauto%3Bhtml%3D1%3BentryX%3D0.5%3BentryY%3D0%3BentryDx%3D0%3BentryDy%3D0%3B%22%20edge%3D%221%22%20source%3D%224%22%20target%3D%229%22%20parent%3D%221%22%3E%3CmxGeometry%20relative%3D%221%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3CmxCell%20id%3D%224%22%20value%3D%22Storage%22%20style%3D%22rounded%3D1%3BwhiteSpace%3Dwrap%3Bhtml%3D1%3BfillColor%3D%23fff2cc%3BstrokeColor%3D%23d6b656%3B%22%20vertex%3D%221%22%20parent%3D%221%22%3E%3CmxGeometry%20x%3D%22320%22%20y%3D%22270%22%20width%3D%22200%22%20height%3D%2240%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3CmxCell%20id%3D%225%22%20style%3D%22rounded%3D1%3BorthogonalLoop%3D1%3BjettySize%3Dauto%3Bhtml%3D1%3B%22%20edge%3D%221%22%20source%3D%228%22%20target%3D%2210%22%20parent%3D%221%22%3E%3CmxGeometry%20relative%3D%221%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3CmxCell%20id%3D%226%22%20value%3D%22%22%20style%3D%22rounded%3D0%3BorthogonalLoop%3D1%3BjettySize%3Dauto%3Bhtml%3D1%3B%22%20edge%3D%221%22%20source%3D%228%22%20target%3D%2211%22%20parent%3D%221%22%3E%3CmxGeometry%20relative%3D%221%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3CmxCell%20id%3D%227%22%20style%3D%22rounded%3D0%3BorthogonalLoop%3D1%3BjettySize%3Dauto%3Bhtml%3D1%3BentryX%3D0.342%3BentryY%3D-0.033%3BentryDx%3D0%3BentryDy%3D0%3BentryPerimeter%3D0%3B%22%20edge%3D%221%22%20source%3D%228%22%20target%3D%2212%22%20parent%3D%221%22%3E%3CmxGeometry%20relative%3D%221%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3CmxCell%20id%3D%228%22%20value%3D%22Disk%22%20style%3D%22rounded%3D1%3BwhiteSpace%3Dwrap%3Bhtml%3D1%3BfillColor%3D%23f8cecc%3BstrokeColor%3D%23b85450%3B%22%20vertex%3D%221%22%20parent%3D%221%22%3E%3CmxGeometry%20x%3D%22250%22%20y%3D%22360%22%20width%3D%22140%22%20height%3D%2230%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3CmxCell%20id%3D%229%22%20value%3D%22Memory%22%20style%3D%22rounded%3D1%3BwhiteSpace%3Dwrap%3Bhtml%3D1%3BfillColor%3D%23e1d5e7%3BstrokeColor%3D%239673a6%3B%22%20vertex%3D%221%22%20parent%3D%221%22%3E%3CmxGeometry%20x%3D%22480%22%20y%3D%22360%22%20width%3D%22120%22%20height%3D%2235%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3CmxCell%20id%3D%2210%22%20value%3D%22Flash%2FSSD%22%20style%3D%22rounded%3D1%3BwhiteSpace%3Dwrap%3Bhtml%3D1%3BfillColor%3D%23f8cecc%3BstrokeColor%3D%23b85450%3B%22%20vertex%3D%221%22%20parent%3D%221%22%3E%3CmxGeometry%20x%3D%2290%22%20y%3D%22440%22%20width%3D%22120%22%20height%3D%2230%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3CmxCell%20id%3D%2211%22%20value%3D%22HDD%22%20style%3D%22rounded%3D1%3BwhiteSpace%3Dwrap%3Bhtml%3D1%3BfillColor%3D%23f8cecc%3BstrokeColor%3D%23b85450%3B%22%20vertex%3D%221%22%20parent%3D%221%22%3E%3CmxGeometry%20x%3D%22240%22%20y%3D%22450%22%20width%3D%22120%22%20height%3D%2230%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3CmxCell%20id%3D%2212%22%20value%3D%22Cloud%22%20style%3D%22rounded%3D1%3BwhiteSpace%3Dwrap%3Bhtml%3D1%3BfillColor%3D%23f8cecc%3BstrokeColor%3D%23b85450%3B%22%20vertex%3D%221%22%20parent%3D%221%22%3E%3CmxGeometry%20x%3D%22370%22%20y%3D%22480%22%20width%3D%22120%22%20height%3D%2230%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3CmxCell%20id%3D%2213%22%20style%3D%22edgeStyle%3Dnone%3Brounded%3D1%3BorthogonalLoop%3D1%3BjettySize%3Dauto%3Bhtml%3D1%3BentryX%3D0.5%3BentryY%3D0%3BentryDx%3D0%3BentryDy%3D0%3B%22%20edge%3D%221%22%20source%3D%2214%22%20target%3D%224%22%20parent%3D%221%22%3E%3CmxGeometry%20relative%3D%221%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3CmxCell%20id%3D%2214%22%20value%3D%22API%22%20style%3D%22rounded%3D1%3BwhiteSpace%3Dwrap%3Bhtml%3D1%3BfillColor%3D%23dae8fc%3BstrokeColor%3D%236c8ebf%3B%22%20vertex%3D%221%22%20parent%3D%221%22%3E%3CmxGeometry%20x%3D%22320%22%20y%3D%22190%22%20width%3D%22200%22%20height%3D%2240%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3CmxCell%20id%3D%2215%22%20style%3D%22edgeStyle%3Dnone%3Brounded%3D1%3BorthogonalLoop%3D1%3BjettySize%3Dauto%3Bhtml%3D1%3BentryX%3D0.5%3BentryY%3D0%3BentryDx%3D0%3BentryDy%3D0%3B%22%20edge%3D%221%22%20source%3D%2216%22%20target%3D%2214%22%20parent%3D%221%22%3E%3CmxGeometry%20relative%3D%221%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3CmxCell%20id%3D%2216%22%20value%3D%22SQL%22%20style%3D%22rounded%3D1%3BwhiteSpace%3Dwrap%3Bhtml%3D1%3BfillColor%3D%2360a917%3BstrokeColor%3D%232D7600%3BfontColor%3D%23ffffff%3B%22%20vertex%3D%221%22%20parent%3D%221%22%3E%3CmxGeometry%20x%3D%22260%22%20y%3D%22110%22%20width%3D%22120%22%20height%3D%2230%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3CmxCell%20id%3D%2217%22%20style%3D%22edgeStyle%3Dnone%3Brounded%3D1%3BorthogonalLoop%3D1%3BjettySize%3Dauto%3Bhtml%3D1%3BentryX%3D0.5%3BentryY%3D0%3BentryDx%3D0%3BentryDy%3D0%3B%22%20edge%3D%221%22%20source%3D%2218%22%20target%3D%2214%22%20parent%3D%221%22%3E%3CmxGeometry%20relative%3D%221%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3CmxCell%20id%3D%2218%22%20value%3D%22Key%20Value%22%20style%3D%22rounded%3D1%3BwhiteSpace%3Dwrap%3Bhtml%3D1%3BfillColor%3D%231ba1e2%3BstrokeColor%3D%23006EAF%3BfontColor%3D%23ffffff%3B%22%20vertex%3D%221%22%20parent%3D%221%22%3E%3CmxGeometry%20x%3D%22450%22%20y%3D%22110%22%20width%3D%22120%22%20height%3D%2230%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3CmxCell%20id%3D%2219%22%20value%3D%22Database%20Engine%22%20style%3D%22text%3Bhtml%3D1%3BstrokeColor%3Dnone%3BfillColor%3Dnone%3Balign%3Dcenter%3BverticalAlign%3Dmiddle%3BwhiteSpace%3Dwrap%3Brounded%3D0%3BfontFamily%3DVerdana%3BfontSize%3D18%3BfontStyle%3D1%22%20vertex%3D%221%22%20parent%3D%221%22%3E%3CmxGeometry%20x%3D%22320%22%20y%3D%2230%22%20width%3D%22200%22%20height%3D%2240%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3C%2Froot%3E%3C%2FmxGraphModel%3E

With some high level understanding of databases now we can now focus on specific type of storage system and try to explore what types of application can be built on it.

Key-Value store is very popular storage system options. 




Key value store are like hash table that is designed to store/retrieve key & value tuple. Key-Value store can be memory or disk based. Some key value stores variations like B-Tree or LSM tree are ordered key value store.
Ordered KV stores provide efficient range scan and is very important if client interface is SQL like.

I will used ordered KV store as example to share type of application that can be build. I will use ecommerce Order data model as example for all the usecase




- Row Store
  This is the most straight forward use-case for any KV stores and also very good fit for building unique index for primary key. For order table KV store will contain below entry

Order/OrderId/1 -> {......}
Order/OrderId/2 -> {......}
Order/OrderId/3 -> {......}

Important thing to note in this format is that it contains fully qualified id of record that us made up of {table name}/{PK Column}/{PK Value}.

This type of key provides flexibility to use single KV store for multiple tables, another advantage that we get it since all the values are ordered by Key so query has to load less numbers of pages to get all the data for this table.

Indexes
Ordered KV store can be extended to build non unique indexes also so that it can be used as efficient lookup table for underlying data.

Lets try to build couple of non unique index on order table.

Index on customer id 

Order/CustomerId/100/1 -> {......}
Order/CustomerId/101/1 -> {......}
Order/CustomerId/102/2 -> {......}

Index on Order Date

Order/OrderDate/20201001/1 -> {......}
Order/OrderDate/20201001/2 -> {......}
Order/OrderDate/20201010/3 -> {......}

Before i show other complex example lets try to understand format of key, it is contains "{table name}/{Index Col}/{Index Value}/{PK}" pattern, this pattern allow to handle duplicate values in index because primary key is suffix of every entry.

Lets keep going on this with little more complex example of index by Status & Order date

Order/StatusByOrderDate/CANCELLED_20201010/3 -> {......}
Order/StatusByOrderDate/CANCELLED_20201010/6 -> {......}
Order/StatusByOrderDate/PENDING_20201001/2 -> {......}
Order/StatusByOrderDate/PENDING_20201001/7 -> {......}
Order/StatusByOrderDate/PENDING_20201003/8 -> {......}
Order/StatusByOrderDate/SHIPPED_20201001/1 -> {......}

In above example Index key is made of 2 column.

- Range Scans

By this point it will be becoming clear how efficient range scan can be done with above index data.

Lets use some example query to verify this .

 - find all the orders that are cancelled between 20201001 to 20201015.
Above query can be resolved by looking for all the records between Order/StatusByOrderDate/CANCELLED_20201001 to Order/StatusByOrderDate/CANCELLED_20201031

find all the orders that are cancelled in Oct 2020.
All records matching Order/StatusByOrderDate/CANCELLED_202010 pattern

-find all the pending orders
All records matching Order/StatusByOrderDate/PENDING pattern

One thing to note that all the related keys are together and can be retrieved using less number of IO operations. 

- Partition Index
Distributed databases creates partitions to split big data into chunk of manageable data so that it can be distributed and replicated.
Partition can be done using hash key or by key range. Key range based partition are must of interface is SQL or allows range scanning. 


Lets take above data for key column as example set to distributed. 

One way to create partition is to create partition of size 5 and store start & end key of that partition with data node reference, so that query looking for data in that range can be quickly resolved.

Partition index might look something like below.



- Column Store
One more place where ordered KV stores shines is creating column store.

We need slight change in format to achieve column store, it will use {table name}/{column}/{pk} -> {column value} format


This type of key format will put all the column for given table together.
Any query that needs selective column can take advantage of this layout as all the column are next to each other.
 
- SQL
Last one is building SQL interface on ordered map! SQL query need 2 basic operation to answer all the request
 - Point lookup
 - Range scan

With above examples of KV stores it is easy to build simple SQL using above mention access pattern.

Sample Application
As part of the research for this post, i build implementation that is based on all the ideas shared above. High level architecture of simple implementation looks something like below



In this application i have used 3 implementation of Ordered map to show underlying storage can be changed without given up on any functionality. 

- SkipList 
This is java "ConcurrentSkipListMap", which is in memory ordered map and is used in many real open source database to build memory store on top of LSM tree. Cassandra uses SkipList data structure for Memory table.

- B Tree
H2 database is based on B-Tree storage called "MVStore", it is possible to use MvStore as library. I used MVStore as B-Tree storage

- LSM Tree
Many implementation are available and popular ones are LevelDB, RocksDB etc. I used RocksDB as it is already used in many databases as storage engine. MyRocks, CassandraRocks, cockroachdb  etc


Implementation is based on 2 interface ( KeyValueStore & SSTable) to expose both key value and SQL interface on different storage backend.


public interface KeyValueStore {

<Row_Type> SSTable<Row_Type> createTable(String tableName, Class<Row_Type> type,
Map<String, Function<Row_Type, Object>> schema,
Map<String, Function<Row_Type, String>> indexes);

<Row_Type> SSTable<Row_Type> createTable(String tableName, Class<Row_Type> type,
Map<String, Function<Row_Type, Object>> schema);

<Row_Type> SSTable<Row_Type> createTable(TableInfo<Row_Type> tableInfo);

List<String> desc(String table);

void close();

<Row_Type> SSTable<Row_Type> table(String tableName);

default void execute(String sql, Consumer<RowValue> consumer) {
new SqlAPI(this).execute(sql, consumer);
}
}
 
public interface SSTable<T_TYPE> {
List<String> cols();

//Search functions
void scan(Consumer<T_TYPE> consumer, int limit);

void search(String indexName, String searchValue, Consumer<T_TYPE> consumer, int limit);

void search(String indexName, String searchValue, Collection<T_TYPE> container, int limit);

void rangeSearch(String index, String startKey, String endKey, Collection<T_TYPE> container, int limit);

T_TYPE get(String pk);

default Map<String, Function<T_TYPE, Object>> schema() {
return null;
}

default Map<String, Function<T_TYPE, String>> indexes() {
return null;
}

default Object columnValue(String col, Object row) {
return null;
}

//Mutation functions
void insert(T_TYPE row);

void update(T_TYPE record); // Secondary index needs rebuilding
}


All the code is available on github @ query repo.

Each implementation is unit tested using contract based test, so feel free to experiment with it.

While implementation first i build KV related functionality and at the end added simple SQL interface. Adding simple SQL was done with help of Calcite SQL parser. Once basic query primitive was available ( point and range scan) then simple SQL interface was easy to add and that is the reason why i have "sql" as default function on interface. 

Conclusion
Ordered KV store are powerful data structure and can be used for solving many interesting problems. Many commercial and open source database are using some of techniques mention in post. 

In this post i have intentionally not covered about how to handle different data types and how to optimized Index key. These are things that can be solved by better encoding that still maintains sort property of key.

I plan to cover about encoding rules in future post.