Sunday, 27 September 2020

Let the Stream flow

Java Streams is one of the powerful feature of JDK that is based on Lambda.

This post is quick refresher of Streams concepts using learning test.



Streams are made up of 

Source |> map | filter |>reduce 

Stream basic

Stream is computation pipeline that start with Source and series of intermediate operation and ends with terminal operation.

Stream is expressed as pipeline of functional transformation and it enable optimal execution strategy like lazy execution, short circuiting & fusion of operations.

These execution strategy allows to avoid un-necessary materialization of data because many things are done as Single pass or by multiplexing.

Streams can be also seen as SIMD at application layer. Stream is made of state less & state full operations, state less operations are part of single stage of pipeline. 

State less operation like ( map, flatmap,filter) are fused to provide optimal execution and only state full operation like (sort, takewhile , drop while,limit, distinct ) can add barrier or new stage in pipeline.

Since stream is computation pipeline, so it takes advantage of CPU caches by performing all the transformation on single element while it is hot in cache. Some time this execution strategy is also called depth first, goes to leaf and process it.

Accessing data while it is hot in CPU cache makes big difference in performance and you can read about it more on post cpu-cache-access-pattern 

Stream ends with terminal operations. Terminal operation are short-circuit ( allMatch, findFirst,anyMatch) or non short circuit like (reduce , collect , forEach)

Short circuit will cause early termination and very useful for search related operation.

Non short circuit operation will touch every element of stream, reduce & collect is example of such operation and allows to solve very complex problems.

Streams favors reduction/folding over imperative accumulation, reduction are easy to make it parallel and simple to understand. It also opens up embarrassing parallel opportunity.  

Reduction also has property of associativity for example

(+(+ (+ a b) c ) d) = (+ (+ a b) (+ c d))

Above example is reduction using Plus(+) operator, left is sequential reduction and right one is parallel but output is same for both of the execution path.

Associative operator are embarrassingly parallel.   

Power of stream is in advance reduction patterns and collectors class has tons of example.

Collector accepts supplier, accumulator & combiner. These 3 things are composed to do very complex reduction.

Lets look at String based reduction by looking at world famous String joiner. 

values
.stream()
.collect(StringBuilder::new, (sb, value) -> sb.append(value), (sb1, sb2) -> sb1.append(sb2))


If stream ends with non short circuit operation then records are processed in batch(forEachRemaning) and in case of shortCircut it is process as single at a time(tryAdvance)


Stream Operations Flag

Every stream has some characteristic that is used by stream framework for optimization.
As a application programmer we don't get exposed to stream operations flag but knowing these will help in understanding optimization technique used by stream.
  
Stream at source is defined with characteristic and stream operation(map, filter, limit, sort etc) may preserve, clear or inject new characteristic.
Terminal operation will result in inspecting all the characteristic and select optimized code path for execution.

Very simple stream characteristic is Parallel, this is taken in account by stream framework to use single thread vs multiple threads for execution.
Some of the other Stream flags are
  • Distinct - Stream has distinct values.
  • Sorted - Element are sorted by natural order
  • Ordered - Element has order 
  • Size - Finite size, important for splitting .
  • Short Circuit - stream can be short circuit, it may be due to find, limit etc
Lets take distinct stream operation to understand how it gets optimized.
Once we have distinct element then we can count the number of element using below code snippet. 

values
.stream()
.distinct()
.count()

Distinct count of element can be implemented in many ways based on underlying collection of stream.

  • List/LinkedList 

 If underlying collection is list then only brute force way can be used for distinct count. We have to allocate Set and keeping adding element in the set and then return size. This will cause some memory pressure on system when collection is large.
 

  • SortedSet
If underlying collection is Sorted collection like Tree Set then distinct count does not need any memory allocation and distinct can be computed by using simple loop checking current and previous value, code snippet doing distinct count 


static int distinct(SortedSet<String> values) {
Iterator<String> itr = values.iterator();
if (!itr.hasNext()) return 0;

String previous = itr.next();
int itemCount = 1;
while (itr.hasNext()) {
String next = itr.next();
if (!previous.equals(next)) {
itemCount++;
previous = next;
}
}
return itemCount;
}

  • Set
If underlying collection is Set then it is just calling size function on it!

This is simple example on how Stream pipeline can take advantage of Stream Ops to plugin optimal code. This seems like the way database optimizer works and shows power of declarative programming.
  
 We can take this further by adding new Stream ops like approx distinct and it can be based on HyperLogLog probabilistic data structure and it can handle any types of collection with very less memory overhead and by trading off little bit of accuracy.  I shared about some of the probabilistic data structure in data-structure-for-big-data post.

Other flags also does lots of magic to make code fast.

Conclusion

Stream is very powerful abstraction for solving problem using declarative way.
Enjoy the various examples of streams in github project streams. Examples are organized in chapters and it cover simple to advance usage patterns.