Monday, 16 November 2020

Rethinking key value store

In this post i will share how key-value stores can be used for building different types of application, but before we start deep dive lets do quick recap of database systems.

Database technology is made of 2 important components ( Storage + Query). Such type of architecture allow to replace part of system or only build storage or query part of it and reuse other from open source.




Many modern database are taking advantage of plugin architecture and focusing on only one part of system, for example storage could be again broken down in various format based on access pattern for reads/write


 

Each storage format ( hash,b-tree,lsm or log) is optimized for specific read/write access pattern. Many database uses multiples format also.

Once we come to storage format then Row vs Column layout also comes in play, so in nutshell it is hard to build storage system, so it makes sense to use off the self solution rather than building something from ground up.

Query area is also huge as this include different types of API, Query engine, Optimizer , transactions etc



%3CmxGraphModel%3E%3Croot%3E%3CmxCell%20id%3D%220%22%2F%3E%3CmxCell%20id%3D%221%22%20parent%3D%220%22%2F%3E%3CmxCell%20id%3D%222%22%20style%3D%22edgeStyle%3Dnone%3Brounded%3D1%3BorthogonalLoop%3D1%3BjettySize%3Dauto%3Bhtml%3D1%3BentryX%3D0.5%3BentryY%3D0%3BentryDx%3D0%3BentryDy%3D0%3B%22%20edge%3D%221%22%20source%3D%224%22%20target%3D%228%22%20parent%3D%221%22%3E%3CmxGeometry%20relative%3D%221%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3CmxCell%20id%3D%223%22%20style%3D%22edgeStyle%3Dnone%3Brounded%3D1%3BorthogonalLoop%3D1%3BjettySize%3Dauto%3Bhtml%3D1%3BentryX%3D0.5%3BentryY%3D0%3BentryDx%3D0%3BentryDy%3D0%3B%22%20edge%3D%221%22%20source%3D%224%22%20target%3D%229%22%20parent%3D%221%22%3E%3CmxGeometry%20relative%3D%221%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3CmxCell%20id%3D%224%22%20value%3D%22Storage%22%20style%3D%22rounded%3D1%3BwhiteSpace%3Dwrap%3Bhtml%3D1%3BfillColor%3D%23fff2cc%3BstrokeColor%3D%23d6b656%3B%22%20vertex%3D%221%22%20parent%3D%221%22%3E%3CmxGeometry%20x%3D%22320%22%20y%3D%22270%22%20width%3D%22200%22%20height%3D%2240%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3CmxCell%20id%3D%225%22%20style%3D%22rounded%3D1%3BorthogonalLoop%3D1%3BjettySize%3Dauto%3Bhtml%3D1%3B%22%20edge%3D%221%22%20source%3D%228%22%20target%3D%2210%22%20parent%3D%221%22%3E%3CmxGeometry%20relative%3D%221%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3CmxCell%20id%3D%226%22%20value%3D%22%22%20style%3D%22rounded%3D0%3BorthogonalLoop%3D1%3BjettySize%3Dauto%3Bhtml%3D1%3B%22%20edge%3D%221%22%20source%3D%228%22%20target%3D%2211%22%20parent%3D%221%22%3E%3CmxGeometry%20relative%3D%221%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3CmxCell%20id%3D%227%22%20style%3D%22rounded%3D0%3BorthogonalLoop%3D1%3BjettySize%3Dauto%3Bhtml%3D1%3BentryX%3D0.342%3BentryY%3D-0.033%3BentryDx%3D0%3BentryDy%3D0%3BentryPerimeter%3D0%3B%22%20edge%3D%221%22%20source%3D%228%22%20target%3D%2212%22%20parent%3D%221%22%3E%3CmxGeometry%20relative%3D%221%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3CmxCell%20id%3D%228%22%20value%3D%22Disk%22%20style%3D%22rounded%3D1%3BwhiteSpace%3Dwrap%3Bhtml%3D1%3BfillColor%3D%23f8cecc%3BstrokeColor%3D%23b85450%3B%22%20vertex%3D%221%22%20parent%3D%221%22%3E%3CmxGeometry%20x%3D%22250%22%20y%3D%22360%22%20width%3D%22140%22%20height%3D%2230%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3CmxCell%20id%3D%229%22%20value%3D%22Memory%22%20style%3D%22rounded%3D1%3BwhiteSpace%3Dwrap%3Bhtml%3D1%3BfillColor%3D%23e1d5e7%3BstrokeColor%3D%239673a6%3B%22%20vertex%3D%221%22%20parent%3D%221%22%3E%3CmxGeometry%20x%3D%22480%22%20y%3D%22360%22%20width%3D%22120%22%20height%3D%2235%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3CmxCell%20id%3D%2210%22%20value%3D%22Flash%2FSSD%22%20style%3D%22rounded%3D1%3BwhiteSpace%3Dwrap%3Bhtml%3D1%3BfillColor%3D%23f8cecc%3BstrokeColor%3D%23b85450%3B%22%20vertex%3D%221%22%20parent%3D%221%22%3E%3CmxGeometry%20x%3D%2290%22%20y%3D%22440%22%20width%3D%22120%22%20height%3D%2230%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3CmxCell%20id%3D%2211%22%20value%3D%22HDD%22%20style%3D%22rounded%3D1%3BwhiteSpace%3Dwrap%3Bhtml%3D1%3BfillColor%3D%23f8cecc%3BstrokeColor%3D%23b85450%3B%22%20vertex%3D%221%22%20parent%3D%221%22%3E%3CmxGeometry%20x%3D%22240%22%20y%3D%22450%22%20width%3D%22120%22%20height%3D%2230%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3CmxCell%20id%3D%2212%22%20value%3D%22Cloud%22%20style%3D%22rounded%3D1%3BwhiteSpace%3Dwrap%3Bhtml%3D1%3BfillColor%3D%23f8cecc%3BstrokeColor%3D%23b85450%3B%22%20vertex%3D%221%22%20parent%3D%221%22%3E%3CmxGeometry%20x%3D%22370%22%20y%3D%22480%22%20width%3D%22120%22%20height%3D%2230%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3CmxCell%20id%3D%2213%22%20style%3D%22edgeStyle%3Dnone%3Brounded%3D1%3BorthogonalLoop%3D1%3BjettySize%3Dauto%3Bhtml%3D1%3BentryX%3D0.5%3BentryY%3D0%3BentryDx%3D0%3BentryDy%3D0%3B%22%20edge%3D%221%22%20source%3D%2214%22%20target%3D%224%22%20parent%3D%221%22%3E%3CmxGeometry%20relative%3D%221%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3CmxCell%20id%3D%2214%22%20value%3D%22API%22%20style%3D%22rounded%3D1%3BwhiteSpace%3Dwrap%3Bhtml%3D1%3BfillColor%3D%23dae8fc%3BstrokeColor%3D%236c8ebf%3B%22%20vertex%3D%221%22%20parent%3D%221%22%3E%3CmxGeometry%20x%3D%22320%22%20y%3D%22190%22%20width%3D%22200%22%20height%3D%2240%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3CmxCell%20id%3D%2215%22%20style%3D%22edgeStyle%3Dnone%3Brounded%3D1%3BorthogonalLoop%3D1%3BjettySize%3Dauto%3Bhtml%3D1%3BentryX%3D0.5%3BentryY%3D0%3BentryDx%3D0%3BentryDy%3D0%3B%22%20edge%3D%221%22%20source%3D%2216%22%20target%3D%2214%22%20parent%3D%221%22%3E%3CmxGeometry%20relative%3D%221%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3CmxCell%20id%3D%2216%22%20value%3D%22SQL%22%20style%3D%22rounded%3D1%3BwhiteSpace%3Dwrap%3Bhtml%3D1%3BfillColor%3D%2360a917%3BstrokeColor%3D%232D7600%3BfontColor%3D%23ffffff%3B%22%20vertex%3D%221%22%20parent%3D%221%22%3E%3CmxGeometry%20x%3D%22260%22%20y%3D%22110%22%20width%3D%22120%22%20height%3D%2230%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3CmxCell%20id%3D%2217%22%20style%3D%22edgeStyle%3Dnone%3Brounded%3D1%3BorthogonalLoop%3D1%3BjettySize%3Dauto%3Bhtml%3D1%3BentryX%3D0.5%3BentryY%3D0%3BentryDx%3D0%3BentryDy%3D0%3B%22%20edge%3D%221%22%20source%3D%2218%22%20target%3D%2214%22%20parent%3D%221%22%3E%3CmxGeometry%20relative%3D%221%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3CmxCell%20id%3D%2218%22%20value%3D%22Key%20Value%22%20style%3D%22rounded%3D1%3BwhiteSpace%3Dwrap%3Bhtml%3D1%3BfillColor%3D%231ba1e2%3BstrokeColor%3D%23006EAF%3BfontColor%3D%23ffffff%3B%22%20vertex%3D%221%22%20parent%3D%221%22%3E%3CmxGeometry%20x%3D%22450%22%20y%3D%22110%22%20width%3D%22120%22%20height%3D%2230%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3CmxCell%20id%3D%2219%22%20value%3D%22Database%20Engine%22%20style%3D%22text%3Bhtml%3D1%3BstrokeColor%3Dnone%3BfillColor%3Dnone%3Balign%3Dcenter%3BverticalAlign%3Dmiddle%3BwhiteSpace%3Dwrap%3Brounded%3D0%3BfontFamily%3DVerdana%3BfontSize%3D18%3BfontStyle%3D1%22%20vertex%3D%221%22%20parent%3D%221%22%3E%3CmxGeometry%20x%3D%22320%22%20y%3D%2230%22%20width%3D%22200%22%20height%3D%2240%22%20as%3D%22geometry%22%2F%3E%3C%2FmxCell%3E%3C%2Froot%3E%3C%2FmxGraphModel%3E

With some high level understanding of databases now we can now focus on specific type of storage system and try to explore what types of application can be built on it.

Key-Value store is very popular storage system options. 




Key value store are like hash table that is designed to store/retrieve key & value tuple. Key-Value store can be memory or disk based. Some key value stores variations like B-Tree or LSM tree are ordered key value store.
Ordered KV stores provide efficient range scan and is very important if client interface is SQL like.

I will used ordered KV store as example to share type of application that can be build. I will use ecommerce Order data model as example for all the usecase




- Row Store
  This is the most straight forward use-case for any KV stores and also very good fit for building unique index for primary key. For order table KV store will contain below entry

Order/OrderId/1 -> {......}
Order/OrderId/2 -> {......}
Order/OrderId/3 -> {......}

Important thing to note in this format is that it contains fully qualified id of record that us made up of {table name}/{PK Column}/{PK Value}.

This type of key provides flexibility to use single KV store for multiple tables, another advantage that we get it since all the values are ordered by Key so query has to load less numbers of pages to get all the data for this table.

Indexes
Ordered KV store can be extended to build non unique indexes also so that it can be used as efficient lookup table for underlying data.

Lets try to build couple of non unique index on order table.

Index on customer id 

Order/CustomerId/100/1 -> {......}
Order/CustomerId/101/1 -> {......}
Order/CustomerId/102/2 -> {......}

Index on Order Date

Order/OrderDate/20201001/1 -> {......}
Order/OrderDate/20201001/2 -> {......}
Order/OrderDate/20201010/3 -> {......}

Before i show other complex example lets try to understand format of key, it is contains "{table name}/{Index Col}/{Index Value}/{PK}" pattern, this pattern allow to handle duplicate values in index because primary key is suffix of every entry.

Lets keep going on this with little more complex example of index by Status & Order date

Order/StatusByOrderDate/CANCELLED_20201010/3 -> {......}
Order/StatusByOrderDate/CANCELLED_20201010/6 -> {......}
Order/StatusByOrderDate/PENDING_20201001/2 -> {......}
Order/StatusByOrderDate/PENDING_20201001/7 -> {......}
Order/StatusByOrderDate/PENDING_20201003/8 -> {......}
Order/StatusByOrderDate/SHIPPED_20201001/1 -> {......}

In above example Index key is made of 2 column.

- Range Scans

By this point it will be becoming clear how efficient range scan can be done with above index data.

Lets use some example query to verify this .

 - find all the orders that are cancelled between 20201001 to 20201015.
Above query can be resolved by looking for all the records between Order/StatusByOrderDate/CANCELLED_20201001 to Order/StatusByOrderDate/CANCELLED_20201031

find all the orders that are cancelled in Oct 2020.
All records matching Order/StatusByOrderDate/CANCELLED_202010 pattern

-find all the pending orders
All records matching Order/StatusByOrderDate/PENDING pattern

One thing to note that all the related keys are together and can be retrieved using less number of IO operations. 

- Partition Index
Distributed databases creates partitions to split big data into chunk of manageable data so that it can be distributed and replicated.
Partition can be done using hash key or by key range. Key range based partition are must of interface is SQL or allows range scanning. 


Lets take above data for key column as example set to distributed. 

One way to create partition is to create partition of size 5 and store start & end key of that partition with data node reference, so that query looking for data in that range can be quickly resolved.

Partition index might look something like below.



- Column Store
One more place where ordered KV stores shines is creating column store.

We need slight change in format to achieve column store, it will use {table name}/{column}/{pk} -> {column value} format


This type of key format will put all the column for given table together.
Any query that needs selective column can take advantage of this layout as all the column are next to each other.
 
- SQL
Last one is building SQL interface on ordered map! SQL query need 2 basic operation to answer all the request
 - Point lookup
 - Range scan

With above examples of KV stores it is easy to build simple SQL using above mention access pattern.

Sample Application
As part of the research for this post, i build implementation that is based on all the ideas shared above. High level architecture of simple implementation looks something like below



In this application i have used 3 implementation of Ordered map to show underlying storage can be changed without given up on any functionality. 

- SkipList 
This is java "ConcurrentSkipListMap", which is in memory ordered map and is used in many real open source database to build memory store on top of LSM tree. Cassandra uses SkipList data structure for Memory table.

- B Tree
H2 database is based on B-Tree storage called "MVStore", it is possible to use MvStore as library. I used MVStore as B-Tree storage

- LSM Tree
Many implementation are available and popular ones are LevelDB, RocksDB etc. I used RocksDB as it is already used in many databases as storage engine. MyRocks, CassandraRocks, cockroachdb  etc


Implementation is based on 2 interface ( KeyValueStore & SSTable) to expose both key value and SQL interface on different storage backend.


public interface KeyValueStore {

<Row_Type> SSTable<Row_Type> createTable(String tableName, Class<Row_Type> type,
Map<String, Function<Row_Type, Object>> schema,
Map<String, Function<Row_Type, String>> indexes);

<Row_Type> SSTable<Row_Type> createTable(String tableName, Class<Row_Type> type,
Map<String, Function<Row_Type, Object>> schema);

<Row_Type> SSTable<Row_Type> createTable(TableInfo<Row_Type> tableInfo);

List<String> desc(String table);

void close();

<Row_Type> SSTable<Row_Type> table(String tableName);

default void execute(String sql, Consumer<RowValue> consumer) {
new SqlAPI(this).execute(sql, consumer);
}
}
 
public interface SSTable<T_TYPE> {
List<String> cols();

//Search functions
void scan(Consumer<T_TYPE> consumer, int limit);

void search(String indexName, String searchValue, Consumer<T_TYPE> consumer, int limit);

void search(String indexName, String searchValue, Collection<T_TYPE> container, int limit);

void rangeSearch(String index, String startKey, String endKey, Collection<T_TYPE> container, int limit);

T_TYPE get(String pk);

default Map<String, Function<T_TYPE, Object>> schema() {
return null;
}

default Map<String, Function<T_TYPE, String>> indexes() {
return null;
}

default Object columnValue(String col, Object row) {
return null;
}

//Mutation functions
void insert(T_TYPE row);

void update(T_TYPE record); // Secondary index needs rebuilding
}


All the code is available on github @ query repo.

Each implementation is unit tested using contract based test, so feel free to experiment with it.

While implementation first i build KV related functionality and at the end added simple SQL interface. Adding simple SQL was done with help of Calcite SQL parser. Once basic query primitive was available ( point and range scan) then simple SQL interface was easy to add and that is the reason why i have "sql" as default function on interface. 

Conclusion
Ordered KV store are powerful data structure and can be used for solving many interesting problems. Many commercial and open source database are using some of techniques mention in post. 

In this post i have intentionally not covered about how to handle different data types and how to optimized Index key. These are things that can be solved by better encoding that still maintains sort property of key.

I plan to cover about encoding rules in future post.

Saturday, 7 November 2020

Private method or state testing in JVM

Unit testing private method is not recommended but some time we are in situation when unit test requires to inspect private state of object. As guideline we must avoid such type of design but some time especially when using some framework or library we are left with no options.




One of the such thing i found recently while writing some unit test around spark data frame. As part of one of the feature dataframe/dataset caching was required and no easy way to verify whether caching was done or not. I didn't want to add other layer of abstraction on Spark API to do this.

Code snippet that needs to be tested. 

sparkSession.read
.parquet("....")
.cache // Cache this DF/DS
.createOrReplaceGlobalTempView("mysupertable")

Java has excellent support for Metaprogramming from day 1 and reflection plays big role it that. Reflection can be used in such scenario.

sparkSession.sharedState.cacheManager maintain cached tables details. 

CacheManager is internal spark class and may change with new spark version , so test based on internal details has risk of breaking but it also gives good idea about internals of framework.

Lets try to access private state of cachemanager via java reflection.

Below code snippet will search the fields with specific pattern and make it accessible. 

def fieldValue[T](fieldName: String, obj: Any, cls: Class[T]): T = {
val field = {
val matchedField = classOf[CacheManager].getDeclaredFields()
.filter(_.getName().endsWith(fieldName))
.map(f => {
f.setAccessible(true)
f
})
.head
matchedField
}

field
.get(obj)
.asInstanceOf[T]
}

setAccessible(true) is the key thing, it is required for private or protected members.
One more to highlight that "endsWith" is used for matching rather than exact match because class under test is written in Scala and for Scala part of the field names are generated by compiler, so not possible to find exact match. Java reflection has getDeclaredField function that accept field name also. 

Once member is marked as accessible then read/write is possible for that field.

Sample call to this API will look something like below

fieldValue("cachedData", sqlSession.sharedState().cacheManager(), LinkedList.class)
Unit test code checking cache can be something like below 

LinkedList cachedData = fieldValue("cachedData", sparkSession().sharedState().cacheManager(), LinkedList.class);
int before = cachedData.size();
loadVehicleTable();
int after = cachedData.size();
Assert.isTrue(after == before + 1);

Reflection is very powerful and comes handy in such scenario but comes with tradeoff of relaxed type safety guarantee by treating method or variable as String but it is still better than not testing at all. 
 
One more thing that i discovered during this exercise is that JVM spends some time in checking field access and it causes some overhead when reflection is used.
Turning off access check will make code fast also.

With this trick now you can start testing some of the private function that you wished were public or left public only for testing.

Tuesday, 20 October 2020

Histogram is not Instagram

Histogram is very popular technique to represent distribution of numerical data. This representation can be used for various thing like measuring latency of request, cardinality of data , grouping data in bins , frequency density of values.







In this post i will share some of the common use case with histogram.

Measuring latency

Assume that we want to measure response time of API call. First thing that comes to mind is stopwatch! Code snippet will look something like this. 

StopWatch stopWatch = new StopWatch();
stopWatch.start();

//Some time consuming task

stopWatch.stop();

System.out.println(stopWatch.toString());

This is good starting point but does not take any further :-) For serious measurement we need more data points like best response time , worst  , response time at some percentile like how much 90 % of request are taking. Such type of summary will allow us to understand service latency in more details also enable to take better decision. If data is available at different percentile then it can be plotted like below to understand it better.


For some consumer facing service, slow response means bad user experience and loss of revenue. Some of the industry like ecommerce, video streaming companies tracking response time proactively to retain customers.


Data cardinality

Tracking cardinality of some dimension is very important in data intensive application. It can be used for identifying skew in data or for finding optimal plan for execution of read or write request. Once of such examples is Index statistics that is maintained by databases. Database cost based optimizer will use index stats to decided which index will result in faster execution. 

For time series database tracking distribution of data based on day of time will be useful to find peek/busy time and it can used that information for having custom partition logic to manage load. Na├»ve way of doing this will be using frequency map but it will soon run into various performance issue and also limit options to do range query. 

Histogram will come very handy for such scenario. Below charts shows distribution by time.



Frequency density

This is popular use case in data science community. For example e-commerce company will be interested in know spend bins of customer or restaurant will be interested in knowing about frequency of visits. 

These metrics can help business in know about wealthy customers and come up with better strategy to retain or get new customers. Below example from Investopedia  contains people density by age, very useful information for portfolio manager.



Histogram has many more interesting use case. One of the cool property about histogram is that it is Commutative. Items order makes no different in final output due to which it is possible to build histograms in parallel and merge it later.

Another important property of histogram is cumulative, this property allows to do cumulative calculation by merging multiple bins/buckets.


Code Snippet 

Now lets look at one of the histogram library that can be used for various use case.

HdrHistogram by Gil Tene is battel tested library build for measuring latency of application. I found this library to be useful for other use case also. This library is very memory efficient and designed to work in low latency environment and also has very compact loss less serialization.

Latency Measurement

Histogram histogram = new Histogram(2);
range(0, 1000_000)
.forEach(v -> {
long timeTaken = doSomething(v);
histogram.recordValue(timeTaken);
});
histogram.outputPercentileDistribution(System.out, 1.0);
Histogram class is part of HdrHistogram lib, it accepts various parameter. In this example we are specifying precision to use. This value can range in 0 to 5, precision has direct impact on size of histogram.

Another important thing about this library is that histogram will auto resize based on input value and this property plays big role in avoiding upfront allocation.
Running above code will produce output like below 


Lets try to understand output. Output contains 4 columns ( Value , Percentile, Total Count, 1/1-Percentile)

Value - Input value for given percentile 
Percentile - % of records in this group/bucket.
Total Count - Count of records in bucket.

Lets pick some bucket values to understand how to use it.


50% request took 5 Seconds - This metrics is very common when measuring but of no use because it is like average/mean. If you want to do some serious measurement then stay away from average.

97.5% request took 9.7 seconds - This is where it becomes interesting, 25% of request is taking double! It is really bad user experience. 

99.86% request took 10+ seconds - This provides real worst case, around 200K+ request is part of this group and it is takes double time as compared to average. 

Having latency broken down by percentile provide great insight into what is real customer experience. If you are e-commerce shop then every micro second counts and can translate into big gain or loss. 
Trading application also sensitive to latency spike and makes big difference on how fast buy/sell trades can be matched.

Another thing that is very common in micro services world is that to render full page 100s of API call is required and it adds risk that atleast one of the API call to get hit by worst case latency, so overall user experience become bad:-(

Next time anyone talks about latency then ask for 99.9999 percentile to know about real impact.
 

Data cardinality

Database has optimizers like rule based and cost based. Cost based optimizer generate plan based on cost of operations like Join, filter etc. and select best plan quickly. 
One of the important factor in deciding which plan to use is based on data distribution/stats. Each index maintains data cardinality stats so that optimizer can quickly figure out which index to use. 

Cardinality is also useful to find data skew and to come up with plan to handle data skew. Distributed computing framework like Apache Spark suffer from skew and it depends on engineer to add some intelligence in code to avoid Spark job failure.

Trust me real data is always Skewed. 

Lets take a scenario that an E-commerce Giant want to keep some statistics like purchase volume at merchant level, so that it can come up better platform scaling options.

Histogram internally maintains frequency count, so it can be used to calculate total volume at merchant level in efficient way.
For this scenario lets assign unique integer id to each merchant and record purchase in histogram, for recoding purchase we can pass merchant id in histogram.

Some code on how this can be done. 

Histogram histogram = new Histogram(2);
range(0, 1000_000 * 100)
.forEach(v -> {
int merchant = merchant();
histogram.recordValue(merchant);
});
histogram.outputPercentileDistribution(System.out, 1.0);
In above example merchant id from 100 Million transaction are used to build histogram. To show some real skew i have added 50% weight to one particular merchant (i.e merchant id 101).

Running above code will produce something like below

 Value     Percentile TotalCount 1/(1-Percentile)

        1.00 0.000000000000       1043           1.00
      101.00 0.100000000000   50050299           1.11
      101.00 0.200000000000   50050299           1.25
      101.00 0.300000000000   50050299           1.43
      101.00 0.400000000000   50050299           1.67
      101.00 0.500000000000   50050299           2.00
    10047.00 0.550000000000   55023464           2.22
    20095.00 0.600000000000   60047899           2.50
    30079.00 0.650000000000   65038139           2.86
    40191.00 0.700000000000   70092127           3.33
    50175.00 0.750000000000   75086627           4.00
    55039.00 0.775000000000   77520056           4.44
 
50% of transactions (i.e 50 Million) are part of 101 merchant. This information can be used for optimization of read query or write query or coming up with some good product offerings.

One thing to remember that histograms can be merged, so it is possible to calculate histogram using multiple nodes and later merge it to get full view, infact it is possible to build histogram incrementally. 

Next time if data skew is troubling you then try Histogram.

Frequency Density 

This use case takes histogram to next level. Lets extend E-commerce example and now we want to rank customers based on total dollar value they spend.

Histogram comes handy, we have to just feed customer wise total spend and get the buckets that can used for ranking customer. 

Histogram histogram = new Histogram(2);
range(0, 1000_000 * 100)
.forEach(v -> {
long spend = spendValue();
histogram.recordValue(spend);
});
histogram.outputPercentileDistribution(System.out, 1.0);


Again taking 100 million purchase and this time adding spend value, this will create buckets based on spend and later customers can be ranked using these buckets.

Buckets output will look something like below.

 Value     Percentile TotalCount 1/(1-Percentile)

        0.00 0.000000000000       4963           1.00
     2007.00 0.100000000000   10036946           1.11
     4015.00 0.200000000000   20077808           1.25
     6015.00 0.300000000000   30075647           1.43
     8031.00 0.400000000000   40157837           1.67
    10047.00 0.500000000000   50238571           2.00
    11007.00 0.550000000000   55038969           2.22
    12031.00 0.600000000000   60159541           2.50
    13055.00 0.650000000000   65279759           2.86
    14015.00 0.700000000000   70077894           3.33
 
This output can be interpreted like 10 million customer spends in range of 0 to 2K, another 10 million spends 2K to 4K and so forth. Higher bucket will contain premium customers.    

Histogram has many more application, it can be used in machine learning to build features because they allow various vector based arithmetic like add/substract. 

All the code used in this post is available @ measure github

Sunday, 27 September 2020

Let the Stream flow

Java Streams is one of the powerful feature of JDK that is based on Lambda.

This post is quick refresher of Streams concepts using learning test.



Streams are made up of 

Source |> map | filter |>reduce 

Stream basic

Stream is computation pipeline that start with Source and series of intermediate operation and ends with terminal operation.

Stream is expressed as pipeline of functional transformation and it enable optimal execution strategy like lazy execution, short circuiting & fusion of operations.

These execution strategy allows to avoid un-necessary materialization of data because many things are done as Single pass or by multiplexing.

Streams can be also seen as SIMD at application layer. Stream is made of state less & state full operations, state less operations are part of single stage of pipeline. 

State less operation like ( map, flatmap,filter) are fused to provide optimal execution and only state full operation like (sort, takewhile , drop while,limit, distinct ) can add barrier or new stage in pipeline.

Since stream is computation pipeline, so it takes advantage of CPU caches by performing all the transformation on single element while it is hot in cache. Some time this execution strategy is also called depth first, goes to leaf and process it.

Accessing data while it is hot in CPU cache makes big difference in performance and you can read about it more on post cpu-cache-access-pattern 

Stream ends with terminal operations. Terminal operation are short-circuit ( allMatch, findFirst,anyMatch) or non short circuit like (reduce , collect , forEach)

Short circuit will cause early termination and very useful for search related operation.

Non short circuit operation will touch every element of stream, reduce & collect is example of such operation and allows to solve very complex problems.

Streams favors reduction/folding over imperative accumulation, reduction are easy to make it parallel and simple to understand. It also opens up embarrassing parallel opportunity.  

Reduction also has property of associativity for example

(+(+ (+ a b) c ) d) = (+ (+ a b) (+ c d))

Above example is reduction using Plus(+) operator, left is sequential reduction and right one is parallel but output is same for both of the execution path.

Associative operator are embarrassingly parallel.   

Power of stream is in advance reduction patterns and collectors class has tons of example.

Collector accepts supplier, accumulator & combiner. These 3 things are composed to do very complex reduction.

Lets look at String based reduction by looking at world famous String joiner. 

values
.stream()
.collect(StringBuilder::new, (sb, value) -> sb.append(value), (sb1, sb2) -> sb1.append(sb2))


If stream ends with non short circuit operation then records are processed in batch(forEachRemaning) and in case of shortCircut it is process as single at a time(tryAdvance)


Stream Operations Flag

Every stream has some characteristic that is used by stream framework for optimization.
As a application programmer we don't get exposed to stream operations flag but knowing these will help in understanding optimization technique used by stream.
  
Stream at source is defined with characteristic and stream operation(map, filter, limit, sort etc) may preserve, clear or inject new characteristic.
Terminal operation will result in inspecting all the characteristic and select optimized code path for execution.

Very simple stream characteristic is Parallel, this is taken in account by stream framework to use single thread vs multiple threads for execution.
Some of the other Stream flags are
  • Distinct - Stream has distinct values.
  • Sorted - Element are sorted by natural order
  • Ordered - Element has order 
  • Size - Finite size, important for splitting .
  • Short Circuit - stream can be short circuit, it may be due to find, limit etc
Lets take distinct stream operation to understand how it gets optimized.
Once we have distinct element then we can count the number of element using below code snippet. 

values
.stream()
.distinct()
.count()

Distinct count of element can be implemented in many ways based on underlying collection of stream.

  • List/LinkedList 

 If underlying collection is list then only brute force way can be used for distinct count. We have to allocate Set and keeping adding element in the set and then return size. This will cause some memory pressure on system when collection is large.
 

  • SortedSet
If underlying collection is Sorted collection like Tree Set then distinct count does not need any memory allocation and distinct can be computed by using simple loop checking current and previous value, code snippet doing distinct count 


static int distinct(SortedSet<String> values) {
Iterator<String> itr = values.iterator();
if (!itr.hasNext()) return 0;

String previous = itr.next();
int itemCount = 1;
while (itr.hasNext()) {
String next = itr.next();
if (!previous.equals(next)) {
itemCount++;
previous = next;
}
}
return itemCount;
}

  • Set
If underlying collection is Set then it is just calling size function on it!

This is simple example on how Stream pipeline can take advantage of Stream Ops to plugin optimal code. This seems like the way database optimizer works and shows power of declarative programming.
  
 We can take this further by adding new Stream ops like approx distinct and it can be based on HyperLogLog probabilistic data structure and it can handle any types of collection with very less memory overhead and by trading off little bit of accuracy.  I shared about some of the probabilistic data structure in data-structure-for-big-data post.

Other flags also does lots of magic to make code fast.

Conclusion

Stream is very powerful abstraction for solving problem using declarative way.
Enjoy the various examples of streams in github project streams. Examples are organized in chapters and it cover simple to advance usage patterns.


Tuesday, 11 August 2020

Speak the language of domain - part 2

While researching for post - speak-language-of-domain i spent some time building external DSL and experimented with parser and compiler.

Parser & compiler are important components for external DSL because it converts external DSL to executable.


We will take a few examples of external DSL and look at what is required to build a full solution.

Rule Engine

I will use below DSL as external dsl and try to convert it into an executable.  



We allow users to upload dsl files and change the behavior of the system at runtime based on rules in the dsl file. Such a type of feature will make the system very flexible and powerful.
Remember Peter Parker principal "With great power comes great responsibility" , so use such things in the production system at your risk!
     


external DSL pipeline


Loading a DSL file is a trivial thing , it can be uploaded to the system using a web interface. 

Once a file is uploaded then it needs to be parsed, this is where things become little interesting and many built in libraries are available to parse files and extract tokens but in this example we will do simple parsing and code generation.

For parsing template files that will be used, content of the DSL file is put in template file before compilation. 

For this example I am going to use the template file.


Above template file has 2 variables $REPLACE & //CODE, these two variables are replaced during the parsing and code generation phase.

$REPLACE - will contain some random value to give a unique name to the block of code that needs to be compiled. 

//CODE - will contain DSL code submitted by the user.

Once tokens are replaced we are ready for compilation.

For compilation we will use JavaCompiler  API, this api gives a programmatic interface to JVM compiler. Very powerful API for doing something in running VM.

 Code snippet for compiling code using JavaCompiler interface.


Once code is compiled then using the java reflection rule can be loaded.

Code snippet for DSLLoader 


Once the rule is loaded from DSL then it is straightforward to update the current VM to use the newly created rule.

DSL code is type safe, all the checks are done at compile time and it is highly optimized code because it has no reflection or other magic, so it will benefit from all the hotspot compiler.

For DSL creator perspective these rules can be tested in sandbox before using it production.

Few things that can be improved are error messages when DSL has compilation error, right now it is just showing java error messages and it will not be that user friendly for non java programmers. 

Trading System

In earlier post below internal DSL was used.

newOrder(equity("IBM"), (order, ts) -> {
order
.buy(100)
.at(126.07d);

ts.execute(order);
});

newOrder(equity("GOOG"), (order, ts) -> {
order
.sell(100)
.at(1506.85)
.partial();

ts.execute(order);
});

Lets have external DSL for this.

buy 100 IBM stocks at 122.06
This has few keywords(i.e buy, stocks , at) related to trading domain and also has few parameter related to order like 100 (i.e unit) , IBM ( i.e stock) , 122.06 ( i.e price).
For this example it is easy to write as parser that will create order object.
Refer to Parser for more details on how text is parsed and order object is created.

All the code used in this post is available at DSL