Thursday 24 December 2020

Ordered data structure using B+Tree and Skip List

This post is based on rum-conjecture paper for data systems. This paper talks about read, update & memory overhead of data structure and gives some nice examples on how balancing 2 factors leaves 3rd one in bad shape. 

 
Source : http://daslab.seas.harvard.edu/rum-conjecture/



Some of the popular read optimized ordered Key-value data structure are BST , Skip List , Prefix Tree , B+ tree etc.

To make read fast it does some trade off on space. 

In this post I will share space(i.e memory) efficient data structure that takes ideas from B+Tree and Skip List to create a new ordered data structure. 

Quick recap before we build our own data structure. 

B+ Tree

B+Tree is variant BST, every node contains n children. Tree contains 2 types of nodes root, internal & leaves node(i.e external).

Leaves node contains all the entries. Root and internal nodes will contain just keys and pointers to the leaf node. 

Every node has some capacity and when it gets full then split process is used to create another node that will have half of the values from the original node. This splitting helps in keeping allocation in control and also reduces rebalancing overhead as compared to BST.


B+Tree. Source:Wikipedia

This is a very popular data structure for databases, all relational databases and many non relational databases provide indexes that are backed up by B+Tree. 

Its popularity has only increased over time.

Now some databases are built using Log structured merge tree that solves some of the issues with B+Tree.

One of the trade off with B+Tree is that keys are stored at multiple places(leaf & internal nodes) and splits of nodes can cause cascade effects.

Every value is wrapped up in some Node container and that causes extra 32 bit overhead per value and also extra indirection to get to value.

Extra indirection does not work well with current CPU that have big cache lines. 

Skip List

Skip list takes ideas from the ordered link list and to make read and write efficient it adds upper layer or lane of sorted link list that has fewer values and acts like a fast lane to get to value.  

 

Skip List. Source:Wikipedia

This type of structure helps with concurrency because installing new value can be done with lock free operation like CAS.

One of the big trade-offs in skip lists is many keys are duplicated and level for key is decided randomly due to which every time skip levels could be very different although input is the same.

This also has other memory overhead due to extra container objects required to hold the values like B+Tree.

Let's look at new data structures that can take some ideas from these 2 data structures and reduce memory overhead and still maintain performance. 

B+Skip List

I don't have any good name for this DS, so let's call it B+SkipList.

I will take few ideas 

- Ordered memory block

B+Tree allocate nodes that can hold n values in an array. These values are stored in order by using simple array manipulation. This gives the nice property that related values are close together and have good chances to be in the same cache line. 

- Split node when full

B+Tree split full node into 2 to balance level tree. 

- Skip Lanes

Although SkipList is just an ordered link list but skip lane idea makes linked list like array, lower level nodes can be accessed quickly by skip lanes.


Let's create something !

Ordered memory block is straightforward to implement. It will look something like below

Ordered Array



Code snippet for ordered collection. I will take an example of int value for simplicity but the same thing applies for any custom object also.  

public OrderedIntCollection(int maxElement) {
this.maxElement = maxElement;
this.members = new int[1 + maxElement];
this.members[0] = Integer.MAX_VALUE;
this.length = 0;
}

public void insert(int value) {
this.min = Math.min(min, value);
this.max = Math.max(max, value);

int offSet = offSet(value, members, length);

if (members[offSet] == value) {
return; //This will skip copy overhead
}

shiftFrom(offSet);
members[offSet] = value;
length++;
}


Our first building block is ready, let's move to split logic now.

- Capacity check. 

This is required to decide if split is required or not.

- Split logic.

This will do an actual split and return a new node.

- Additional metadata

Now nodes can be split so we need to track some meta data like min, max value.

Split process might look like below



Split is one way to handle when a node is full and it has some advantage but other options are also available like extending current nodes and it could be a useful strategy in some cases to reduce memory fragmentation. We will talk about this later.


Code snippet for split. It is simple array manipulation and makes sure that each node is independent and value can only contain 1 node. 


public boolean hasCapacity(int count) {
return length + count < members.length;
}
public OrderedIntCollection split() {
int half = members.length / 2;

//Split and allocate new
int[] copy = new int[1 + maxElement];
int newLength = this.length - half;
System.arraycopy(members, half, copy, 0, newLength);
copy[newLength] = Integer.MAX_VALUE;
OrderedIntCollection collection = new OrderedIntCollection(maxElement, copy, newLength);

//Update current
this.members[half] = Integer.MAX_VALUE;
this.length = half;
this.max = members[half - 1];
Arrays.fill(members, half + 1, members.length, 0);

return collection;

} 


Lets move to the final piece that is skip lanes.

With the split process now we can have multiple nodes and we have to keep track of all the nodes and maintain order of nodes.

Few things are required for lane data structure 

- Dynamic list that allows you to add elements at any position.

- Ordering info for comparing nodes to identify which node comes first.

- Some way of finding which node will contain value.

- Some efficient algorithm to search values in all the available nodes.


Let's enhance OrderedIntCollection to add ordering info

public int max() {
return max;
}

public int min() {
return min;
}

@Override
public int compareTo(Integer value) {
if (value < min) return 1;
else if (value > max) return -1;
return 0;
}

Min/Max and compareTo function will help in having all the ordering info.

compareTo function will be later used to find which node can contain value.

With help of compareTo now we can store all the nodes in some array based data structure.

At high level it might look at something like below

B+ Skip List




This layout has some nice property like

- Keys are not duplicated.

This is a big gain and helps in reducing memory requirements. Only additional memory allocation is for lists that maintain reference to nodes. 

- Child node are just at 2nd level

Child nodes can be always reached in just 2 hops, as compared to n ( tree depth) in b+tree or SkipList. 

- Fanout of node is good.

It gets fanout the same as B+Tree without overhead of additional internal nodes and it plays an important role in read/writes.

- Data locality in single node.

Some of the discussion after the RUM paper was published was to add Cache also as dimension to look at data structure, so it becomes CRUM.

Packing everything together helps with the "C" part as it provides better cache locality and this is one of the big gains because it can take some benefits of the cache line of the processors.

Since nodes reference is stored in List based structure and are in order, so binary search can be used to identify nodes that are target for read or write.

Read/Write can be done in Log N time. Once a node is identified then it is sequential scan to locate value and that can add some cost but it is possible to use binary search within the node also to reduce overhead.

Trade off

Some of the trade off

Binary search can't be used 100% for writing.

Read request can be always served by binary search but for writing we need a hybrid approach, for eg in above sample if value is inserted that does not fall in any range then sequential search is required to find right node. Some of the sample value that can trigger sequential search are 1 (i.e min value), 300(i.e max value),105 (i.e comes in between node 2 and 3) 

Fragmentation

Due to splitting rules we might see many nodes that are half filled but that issue can be solved by relaxing the splitting rule like allow to choose between split or extend based on data distribution. 

Compact nodes by merging content. 

Code snippet of Skip Lane.
public class OrderedInts {
private final List<OrderedIntCollection> values = new ArrayList<>();
private final int blockSize;
private final AtomicLong bsHit = new AtomicLong();
private final AtomicLong seqHit = new AtomicLong();

public OrderedInts(int blockSize) {
this.blockSize = blockSize;
}

public void insert(int value) {
OrderedIntCollection e = search(value);
e.insert(value);
}

private OrderedIntCollection search(int value) {
if (values.isEmpty()) {
OrderedIntCollection last = new OrderedIntCollection(blockSize);
values.add(last);
return last;
} else {
int position = binarySearch(values, value);
if (position >= 0) {
bsHit.incrementAndGet();
return splitIfRequired(value, position, values.get(position));
} else {
seqHit.incrementAndGet();
return sequentialSearch(value);
}
}
}
}

Conclusion

In computer science we only have trade off and nothing comes for free.
RUM way of looking at data structure provides key insights on trade-off and also provides some ideas around new data structure possibility.
 

Wednesday 16 December 2020

Sparse Matrix

 Vectors are a very powerful data structure and it allows to write/read data using index. These vectors can be combined to create a matrix. Matrix are used in numerical analysis.   

One of the problems with matrix is that most of the real world matrix are sparse and the traditional way of allocating 2 Dimensional arrays to represent matrix can waste lots of space and also need lots of upfront memory.

In this post I will share some ways of storing sparse matrix.

In this example I will use Integer value as an example but these techniques can be used for any data type.

Dense Matrix

This is the most straightforward and default way. This is also most efficient for read and write access but needs a full matrix to be allocated before using it.  




public class IntDenseVectorValue {

final int[][] values;

public IntDenseVectorValue(int xSize, int ySize) {
this.values = new int[xSize][ySize];
}

public void set(int x, int y, int value) {
values[x][y] = value;
}


public int get(int x, int y) {
return values[x][y];
}
}

Many real world data sets are sparse, so lots of space gets wasted in such situations. 

Block Dense Matrix

This technique is based on creating small blocks of W width and each block is a small 2 dimension array.

During set/get X variables are used to identify block numbers.



This approach avoids upfront allocation and a very good fix for incremental data load. Indirection layer is added with help of Map actual block is located. Performance wise this is the same as a Dense vector but with less memory overhead. 

Code snippet for block dense matrix.

public class IntDenseBlockVectorValue {
final int blockSize;
final int width;
final int length;
final Map<Integer, int[][]> values = new HashMap<>();

public IntDenseBlockVectorValue(int x, int y, int blockSize) {
this.length = x;
this.width = y;
this.blockSize = blockSize;
}

public void set(int x, int y, int value) {
check(x, y);
int blockIndex = x / blockSize;
int[][] block = values.computeIfAbsent(blockIndex, index -> new int[blockSize][width]);
int rowIndex = x % blockSize;
block[rowIndex][y] = value;
}

private void check(int x, int y) {
if (x >= length || y >= width) {
throw new ArrayIndexOutOfBoundsException(String.format("Index [%s,%s] does not exists", x, y));
}
}

public int get(int x, int y) {
check(x, y);

int blockIndex = x / blockSize;
int[][] block = values.get(blockIndex);
int rowIndex = x % blockSize;
return block[rowIndex][y];
}

} 

Sparse Row Matrix

This technique takes block allocation to the next level by doing allocation at row level.

Map contains an entry for every row and a single dimension vector is used as a value. 


This helps in reducing wastage to a great extent. If some rows are not added then that row never gets allocated. 

Code snippet


public class IntSparseRowVectorValue {
private final Map<Integer, int[]> values = new HashMap<>();
private final int width;

public IntSparseRowVectorValue(int width) {
this.width = width;
}

public void set(int x, int y, int value) {
int[] row = values.computeIfAbsent(x, index -> new int[width]);
row[y] = value;
}

public int get(int x, int y) {
int[] row = values.get(x);
return row[y];
}
} 


Sparse Row/Column Matrix

This decomposed row vector to individual columns and gives ultimate memory gain with some trade off of read/write access.

This is modeled as Map of Map.



Code snippet

public class SparseRowColVectorValue {

private final Map<Integer, Map<Integer, Integer>> values = new HashMap<>();

public void set(int x, int y, int value) {
Map<Integer, Integer> row = values.computeIfAbsent(x, index -> new HashMap<>());
row.put(y, value);
}

public int get(int x, int y) {
Map<Integer, Integer> row = values.get(x);
return row.get(y);
}

} 

One thing to note in this approach is that the default map of JDK will need a wrapper of primitive type to store value and it can result in some performance overhead due to boxing/unboxing. This issue can be solved by having a primitive Map. 

 

Conclusion

Each of sparse vector representation is special purpose and should be used based on usecase. All of these types can be mixed by having abstraction on top of it that will dynamically identify correct matrix implementation based on underlying data.



 







Sunday 13 December 2020

Sorting under constraint algorithms

In the compiler world code inlining is the most important optimization that enables other optimization like dead code elimination, pre fetching , out of order execution etc.

Similarly sorting also enables many optimizations like binary search , streaming aggregation, range search operations, prefix compression, run length encoding , delta encoding, understanding trends, posting list, data partition  etc.



Sorting is memory and compute intensive work and many times we don't have enough compute/memory to do it.

In this post I will share 2 sorting algorithms that can be used when a system has memory or CPU constraint.

Disk based sorting

We have file containing 1 TB data and we want to sort it. Data is huge to it is not possible to use standard in-memory sorting algorithm for this. 

One way to handle sorting of such data is split it in chunks, sort the chunk in memory, persist chunk to disk and finally merge the sorted chunk using k way merge algorithm.


At high level sort pipeline will look something like below 



Nice thing about this algorithm is that it is Embarrassingly_parallel.

This algorithm is also good example of Divide-and-conquer_algorithm and this technique can be applied both the stages.

This algorithm has 2 stages in the pipeline that can be executed in parallel to take advance of multiple cores.

Lets assume that input file contains 10 Million then it can be decomposed in Split stage


In merge stage we have to do reverse operations of taking multiple files and creating single one.




Split & Merge has different types of compute/disk requirement and it is possible to make both the stage parallel or just one based on constraint.

Overall sort pipeline will look like below.  



This algorithm is used by many databases to manage result sets that can't be fit into memory. 

Important logic in this algorithm is K-Way merge of sorted results. If K is 2 then it is straight forward.

2 Way merge

Merge process is pick head from both iterators and add the value that is less, move pointer of iterator whose value was added.

Need to handle some edge conditions to avoid buffer overflow while reading and handling iterators of different size.


v1.next();
v2.next();

while (v1.hasNext() && v2.hasNext()) {
value1 = v1.value();
value2 = v2.value();
if (isLessThan(value1, value2)) {
buffer.add(value1);
v1.next();
} else {
buffer.add(value2);
v2.next();
}
}

if (v1.hasNext()) {
append(buffer, v1);
} else if (v2.hasNext()) {
append(buffer, v2);
}

K Way merge

Assume K is 4, then one way to merge is to split the whole list in pairs of 2, keep merging in pairs and finally start merge out of 2 way merges. This is a good algorithm but can't take advantage of batching multiple iterators.

Recommended way is to use Heap of K values. This is more efficient as we can process multiple inputs in a single pass and can reduce IO overhead also. 

PriorityQueue<LineIterator> heap=...
LineIterator itr;

while ((itr = heap.poll()) != null) {
write(writer, itr.value());
itr.next();
if (itr.hasNext()) {
heap.add(itr);
}
}

BitMap Sorting

Bitmap is a powerful data structure for searching and has some interesting properties for sort also.

Consider a scenario where the file contains n positive integer and each value is less than K.

K can be really huge depending on max value, to put this in context just by using 256 MB memory billions of int values can be sorted.

Idea is based around an allocated array with every element of K word (i.e 32 or 64). If we used 32 bit words then 32 values can be stored in every slot. Total capacity of this data structure is 32 * len(array).

Setting bit needs 2 information, slot in array and position in that slot.




Bit fiddling enables to pack multiple values in a single word, you want to read more on bit fiddling then refer to bit-fiddling.

In this example bytes is 4 and word size is 32.

Checking for value is straightforward and it involves doing bit wise & on Slot value.

Complete working code 

public static final int NO_OF_BYTE = 4;
private final int WORD_SIZE = 8 * NO_OF_BYTE;
private final int SLOT_SHIFT = NO_OF_BYTE + 1;
private final int[] values;
private final int maxValue;

public BitMapSort(int maxValue) {
this.maxValue = maxValue;
this.values = new int[1 + maxValue / WORD_SIZE];
logUsageInfo(maxValue);
}


public void set(int v) {
this.values[slot(v)] |= position(v);
}

public boolean check(int v) {
int value = this.values[slot(v)] & position(v);
return value != 0;
}

public int position(int v) {
return 1 << (v & WORD_SIZE - 1);
}

public int slot(int v) {
return v >> SLOT_SHIFT;
}


Whole pipeline will look something like below


Trade Off


Nothing is perfect and this also has some constraints and it is good to be aware of it.

- This is a dense value data structure, so if we have to store a value that is of 100 Million then we have to allocate at least 100 million bits ( 95 MB). If values are sparse then find alternate data structure. 

- Thread safety has to be handled at slot level because 32 values are packed in a single slot.

- Values should be distinct but if duplicate values are present and it is going to be less duplicates then additional data structures like maps can be used to keep frequency count. This needs to be handled in a little intelligent way like having some threshold on duplicate value and if it crosses that threshold then it is better to stop accepting value to avoid having everything going to map.

- Iteration. Since this is a compressed representation of dense value, iteration on available value has to be handled in a streaming approach to avoid allocation of huge in memory collection. One of the approach could be having API for consuming single value at a time and let client to decide on what to do with those values, example of such iteration could look something like below

public void consume(IntConsumer consumer) {
IntStream
.range(1, maxValue)
.filter(this::check)
.forEach(consumer::accept);
}

- Range iteration. This data structure is very good for range query.

- Compact set. This is also good DS for set related operations.

Conclusion

These are simple and yet very powerful algorithms and if this fits the bill then it can be the difference between solving the problem or not solving at all. 

Tuesday 8 December 2020

Disk storage algorithm

 This is follow up post on rethinking-key-value-store article to explore storage part of system.



Many data systems support plugin based storage layers and it opens a whole set of options to use one from a shelf or build one that suits your needs.

In this post i will share how a new storage system can be built and later it is used for building time series application on top of it.

Before we go deep in disk based algorithm, let's look at why non-volatile storage is required.

In today times when machines with Terabytes RAM are available why do we have to bother to store stuff on disk ? 

Couple of good reasons why still having good non-volatile storage manager makes sense today.

  • It is cheap

Disk is very cheap as compared RAM, so it does not make sense to store data in expensive store especially when data is not being used frequently. Lots of cloud provider bill can saved! 

  • It is unlimited

Although a machine with big RAM is available but it is still limited , it will not continue get bigger at the same rate as in the past and if we want applications to have the illusion that it has unlimited memory then flushing to disk is a must.  

  • Allow fast restarts
Think what will happen if application crash ? Application has to rebuild the whole state and it could take very long time before application is available again to serve request. Saving computed data to disk and restoring from it will be way faster.

  • Information exchange 

How do two application running on different machine can communicate ? For inter application communication in-memory data has to written in wire format so that it can be sent over network.

and many more reasons..

Application has volatile & non-volatile area and storage manager sits in middle of that (.ie RAM and Disk) and provide efficient access to data.





RAM and DISK are very different types of hardware and access patterns are also very different.

On one hand RAM can be accessed randomly and it is fast for both read/write.

Disks are accessed sequentially using blocks and very slow write and slow read, SSD has improved the access time but sequential access is the recommended to get best performance.

Storage managers have to use efficient data structure on disk to get best performance, another thing is that disk has nothing like malloc to manage allocation. Everything is bare metal and the application developer has to manage allocation, garbage collector, locks etc.

Disk read/write access is based on a block which is usually 4 KB, but memory read/write is based on a cacheline which is 64 Bytes, just this difference in read/write size requires new ways of organizing data to get the best out of the device.  

All the above problems make writing disk based algorithms very challenging.

Lets look at some options of storing data on disk.

Generally file on disk looks some thing like below, each block is of fix sized and it depends on hardware, most of the vendors use 4 KB blocks. IO device provide atomic read/write guarantee at block level. 



Page Layout

Lets unpack disk block to explore options to store data.

Fixed Size

Fixed size data is very common and intuitive way to store data in block provided underlying data is like that and mostly applicable for number variants data type like ( byte, short, int, long , float & double). It is possible to make it work for varchar but padding is required to achieve this. If underlying data can be mapped to fixed size value then this is best option.


Fixed size has good random access property, just by doing simple multiplication specific record can be found for eg to find 3rd record we will use record * sizeof(record) (i.e 3 * 4) to find the offset of data and read it. 
Most of the application has variable record size due to which more flexible storage layout is required. 


Size Prefix

In this approach every record is prefixed with 4 Byte size and followed with data.
This has overhead of extra 4 Bytes and sometime this can be more than actual data and other thing that is not good is that it is sequential access, if last record is required then it requires to scan full page.
One more downsize is what happens when records are updated ? this will cause overflow or fragmentation. 

%3CmxGraphModel%3E%3Croot%3E%3CmxCell%20id%3D%220%22%2F%3E%3CmxCell%20id%3D%221%22%20parent%3D%220%22%2F%3E%3CmxCell%20id%3D%222%22%20value%3D%22%22%20style%3D%22rounded%3D1%3BwhiteSpace%3Dwrap%3Bhtml%3D1%3BfillColor%3D%23fff2cc%3BstrokeColor%3D%23d6b656%3B%22%20vertex%3D%221%22%20parent%3D%221%22%3E%3CmxGeometry%20x%3D%22890%22%20y%3D%22385%22%20width%3D%22190%22%20height%3D%22130%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3C%2Froot%3E%3C%2FmxGraphModel%3E



This is good for queue based system where write is always at the end and read is also large sequential scan. 

Slotted Page

This approach is hybrid one and takes advantage of both fixed and size prefix page.





Slotted page is transformation of Size Prefix page to co-locating related data, for eg all data is together and all size is together.

Single page contains 2 Region
  • Header Region
This section contains some metadata about the page that include version, page id , hash, number of records , compression flag etc. 
 
  • Data Region
Data section is subdivided in data segment & index segment. Index segment is also called as Slot Array and it can be 4 byte or 2 byte fixed size value, it contains pointer to data segment.
Data Segment is written from left and Slot Array is written from right side. Page is considered full once no space is available for either data segment or Slot.

This approach gives random access to records by using Slot array, every record can be addressed by (PageId, Record Id). Full file content can be seen as a 2 dimensional array.


Slotted page is a very popular layout for many databases. This also allows to build sparse or dense indexes based on page & slot.


Disk Data Structure 

Now we will explore how smallest unit of storage (i.e. page) can be taken to build some data structures on top of it and finally some application using disk based data structure.

Remember disk has no malloc API, so we have to build something like pagealloc that will enable dynamic allocation of blocks/page.


Page Allocator interface is an low level API and API looks something like below.

public interface PageAllocator {
WritePage newPage();

long commit(WritePage page);

ReadPage readByPageId(int pageId);

int noOfPages();

int pageSize();

byte version();

List<PageInfo> pages();

ReadPage readByPageOffset(long offSet);

}

Application - Time Series Database

Using Page allocator abstraction we will build Time series database that will use Sorted String table as underlying store.

SSTable stores immutable rows that are ordered by some key in files. SSTable is basis for Log structured merge tree that is alternative to B+Tree.

Log structured merge tree powers many popular data stores like BigtableHBaseLevelDB RocksDBWiredTiger, CassandraInfluxDB and many more.


SSTable

SSTable is made of couple of ordered memory maps & ordered rows on disk. Storage manage sits right in middle to manage sorted structures on disk & memory.




Writers  

All the write requests are handled by writing to In-Memory ordered map and once those maps are full then get converted to read only In-Memory maps and periodically flushed to disk for durability.  

Writing to such a system is very fast because it is done using in-memory data structure. 

Readers

Readers is where this gets more interesting because now read has to hit multiple data structures to find records. First it scans mutable map, then immutable maps and finally on the disk.
Rather than doing a single seek it has to do multiple seeks but since all the data structure be on memory or disk is sorted, so requests can be processed in LOG N time.

Over a period of time a number of files can grow, so a compaction process is required that will merge multiple sorted files and create a small number of files. This compaction process is what makes SSTable as Log structured merge tree.

Some code !
To have some thing working i used ConcurrentSkipListMap from JDK to create In-Memory ordered map and use PageAllocator to flush ordered map to disk.

SortedStringTable

public interface SortedStringTable<V> {

void append(String key, V value);

void iterate(String from, String to, Function<V, Boolean> consumer);

// API for saving SST table for persistence storage
Collection<PageRecord<V>> buffers();

void remove(int pageId);

void flush();
}
Working SSTable code can be found @ sst github.


First data structure is ready for our time series database :-)

Time Series 

Time series application will be built on top of SSTable.


Timeseries interface is simple, it looks something like below.

public interface TimeSeriesStore {

<T> EventInfo insert(T row);

void gt(LocalDateTime fromTime, Function<EventInfo, Boolean> consumer);

void lt(LocalDateTime toTime, Function<EventInfo, Boolean> consumer);

void between(LocalDateTime startTime, LocalDateTime endTime, Function<EventInfo, Boolean> consumer);

void flush();


}
Time series application code can be found @ timeseries repo.

To experiment with some some real time series data, i picked up sample data from Jan Yellow Taxi Trip and loaded in the app. yellow_tripdata_2020-01 has 6+ Million records.

Sample time series application using this data can be found @ NYTaxiRides.java

All the code has good unit test coverage, so feel free to hack and learn.

Conclusion

Disk based algorithm are very cool and understanding it gives good idea about how modern data systems work. You might not build data system from scratch but knowing these algorithm will definitely help in deciding which data system to pick based on trade off.