Sunday, 11 December 2022

More on Dynamic proxy

This is a follow up post from dynamic proxy to share about more realistic examples of dynamic proxy.

Today's software development is heavily reliant on system observability. Observability helps us to understand when the system degrades or misbehaves so that we can take proactive measures to fix it. 

  
Toy service for this example will be FXService 

public interface FXService {

double convert(String from, String to, int amount);
}

This service will use https://api.exchangerate.host API for FX conversion.




Lets look at what types of dynamic proxy we can create on top of this.

Method Timing

It will keep track of method execution time and make it available later for analysis or other purposes. Using this proxy, X slow-running methods will be provided.
In FXService, there is only one method, so this proxy will create a method key with the method name and parameters.


Method timing proxy will show top X slow running method for eg. 


Method convert( SGD,IDR,1 ) took 1041 ms
Method convert( SGD,GBP,1 ) took 994 ms
Method convert( SGD,USD,1 ) took 983 ms
Method convert( SGD,IDR,1 ) took 672 ms
Method convert( SGD,INR,1 ) took 650 ms
Method convert( SGD,USD,1 ) took 593 ms
Method convert( SGD,JPY,1 ) took 593 ms
Method convert( SGD,GBP,1 ) took 582 ms
Method convert( SGD,USD,1 ) took 580 ms
Method convert( SGD,INR,1 ) took 566 ms

Such type of proxy is very helpful in identifying outage or degradation in API.

Stand In Processing

Proxy services such as this can be used to provide stand-in processing when the underlying service is down. For example, when a real FX service is down, this proxy can answer queries from the last successful call.




It is useful for not only enhancing availability but also improving latency, since such a service can answer queries from the local cache right away. Additionally, it may be possible to save some costs if the underlying API is charged by usage.


Chain of proxy

Nice thing about proxies is that multiple proxies can be composed together to create a complex chain of proxy. For example, we can chain Stand In & Method timing together to get features of both.



Below code snippet is creating chain of proxy

FXService core = new FXServiceAPI("https://api.exchangerate.host", 1);
FXService timeRecorderProxy = create(FXService.class, new TimeRecorderProxy(core, tracker));
FXService standInProxy = create(FXService.class, new StandInProcessingProxy(timeRecorderProxy, cache));
FXService fx = standInProxy; 


Full code using all the proxy

List<String> currency = new ArrayList<String>() {{
add("USD");
add("INR");
add("GBP");
add("IDR");
add("JPY");
add("CAD");
}};

IntStream.range(0, 100).forEach($ -> {
Collections.shuffle(currency);
currency.parallelStream().forEach(code -> {
try {
Double d = fx.convert("SGD", code, 1);
System.out.println(d);
} catch (Exception e) {
System.out.println("Failed for " + code);
}
});
});

tracker.dumpSlowRequests(10);
cache.prettyPrint();



Code used in this post is available @ fx service

Conculsion

A dynamic proxy is a powerful tool that is part of Java's ecosystem. It can be a very useful tool for writers of libraries or frameworks, since a proxy's primary purpose is to extend the functionality of an underlying service/api. Therefore, special precautions must be taken to ensure that it does not negatively impact the underlying service.


 

Dynamic Proxy

In software design, a proxy pattern is one of the popular GOF design patterns. 

A proxy is a class functioning as an interface to something else, it could be an interface to some business logic, network, file, or anything else.

This can also be seen as a wrapper around something core or real, main goal of a proxy is to add abstraction to get something extra it could be logging, permission check, cache, metric collection, etc.

We interact with proxies in real life. Take a example of bank interaction via ATM





ATM acts like a proxy to the bank branch, it allows to do almost everything that can be done at a branch.


In software we see many variations of a proxy, some of the examples are in IO API in java




Another variation is chain of responsibility.  




In java proxy can be of 2 types it can be static and dynamic, lets look at a static proxy example.


Static Proxy

We are building Big Collection that allows to store unlimited data, our big collection interface looks like

public interface BigCollection<V> {
void add(V value);

boolean exists(V value);

void forEach(Consumer<V> c);
}


Static proxy will have same interface as original interface and will manually delegate calls to real object, it will look something like below.


public class BigCollectionProxy<V> implements BigCollection<V> {

private final Supplier<BigCollection<V>> supplier;
private final BigCollection<V> realObject;

public BigCollectionProxy(Supplier<BigCollection<V>> supplier) {
this.supplier = supplier;
this.realObject = supplier.get();
}

@Override
public void add(V value) {
realObject.add(value);
}

@Override
public boolean exists(V value) {
return realObject.exists(value);
}

@Override
public void forEach(Consumer<V> c) {
realObject.forEach(c);
}
}


Client API for using the proxy will look something like below 


BigCollection<String> collection = new BigCollectionProxy<>(AwsCollection::new);

collection.add("Value1");
collection.add("Value2");

collection.forEach(System.out::println);

System.out.println("Exists " + collection.exists("Value2"));

Static proxy is easy to implement but it has got few problems 

  • Manual delegation is painful and very verbose.
  • Any changes in interface required proxy also to implement changes.
  • Special treatment to functions that are part of language ecosystem like equals, hashcode,getClass etc. 
  • and as name suggest it is static, can't change the behavior at runtime.

Dynamic proxy solves issue with static proxy, lets look at dynamic proxy.

Dynamic Proxy

Dynamic proxy creates proxy at runtime, it is very flexible and convenient.

 JDK has dynamic proxy API since 1.3 that allows to create dynamic proxy using very simple API

Foo f = (Foo) Proxy.newProxyInstance(Foo.class.getClassLoader(),
                                          new Class[] { Foo.class },
                                          handler);
Lets create dynamic proxy for BigCollection class.

(BigCollection<V>) Proxy.newProxyInstance(BigCollection.class.getClassLoader(),
new Class<?>[]{BigCollection.class},
new BigCollectionDynamicProxy(supplier.get()));

This proxy looks exactly like BigCollection implementation and can be passed around. This also does not have the verbosity of static/hand crafted proxy, full proxy looks something like below


public class BigCollectionDynamicProxy implements InvocationHandler {
private final Object realObject;

public BigCollectionDynamicProxy(Object realObject) {
this.realObject = realObject;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
return method.invoke(realObject, args);
}

public static <V> BigCollection<V> create(Supplier<BigCollection<V>> supplier) {
return (BigCollection<V>) Proxy.newProxyInstance(BigCollection.class.getClassLoader(),
new Class<?>[]{BigCollection.class},
new BigCollectionDynamicProxy(supplier.get()));
}

} 


Java reflection makes it easy to delegate calls to underlying real object.


Dynamic Proxy Use case

Lets look at some use case where dynamic proxy will come handy.

 - Timing of method execution

Elapsed time calculation is one of the cross-cutting concerns and proxy comes in handy for such use cases without the need of adding time tracking code all over the place.
 
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
long start = System.nanoTime();
try {
return method.invoke(realObject, args);
} finally {
long total = System.nanoTime() - start;
System.out.println(String.format("Function %s took %s nano seconds", method.getName(), total));
}
}


 - Single thread execution

Many time some use case need single thread access to critical data structure. Dynamic proxy can add synchronization at the higher level, core code is not worrying about language level synchronization APIs.

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
synchronized (realObject) {
return method.invoke(realObject, args);
}
}

 - Asynchronous Execution.

Asynchronous execution is one of the common techniques to get best out of  cores of underlying machine. Java has made parallel computation very easy with Completable future, parallel streams etc. Newer JDK version will have support for fibers and that will add concept of light weight threads.


Dynamic proxy can be used to convert synchronous API to asynchronous with simple code.  

Code snippet for async exeuction.

es.submit(() -> {
try {
System.out.println("Using thread " + Thread.currentThread().getName());
method.invoke(realObject, args);
} catch (Exception e) {
e.printStackTrace();
}
}); 

Few things to understand for such type of scenario
 - Value is not return, client API has to use some mechanism like call back handler or reply API to get value.
 - Exception handling.


 - Logging.

This is straight forward use case and very popular one. 

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
long start = System.nanoTime();
try {
return method.invoke(realObject, args);
} finally {
long total = System.nanoTime() - start;
System.out.println(String.format("Function %s took %s nano seconds", method.getName(), total));
}
}

Dynamic Proxy Tradeoff

Nothing comes for free, dynamic proxy has below tradeoff.

- Reflection cost 
- Parameter box/unboxing
- Array overhead for parameters.

Reflection related cost are not that much of issue in JDK8 onwards. I wrote about reflection in few post.

Methodhandle returns back in java 8

Boxing & Unboxing overhead are really hard to resolve and this will be always overhead for dynamic proxy unless some code generation technique is used.

Dynamic proxy can be used to build many useful things. In next part of this post i will have more advance usage of dynamic proxy.


Code used in this post is available @ proxy github project



Tuesday, 15 March 2022

Concurrent Heap data structure

Lets Heapify!!!




Heap is very popular data structure used for solving Top X types of problem.

For eg find the top 10 popular items by sales volume, top X users by activity etc.

PriorityQueue data structure of java is based on heap and can help in answering any top X type of query. PriorityQueue is not thread safe, so it can't be used in highly concurrent environment without adding lock.

Underlying data structure of heap is array and elements are shifted up and down to maintain the element order, for each swim operation the full array should be locked down to avoid race condition. 

Even read options like poll are mutating operation due to which it is hard to share Heap with multiple threads.

Underlying algorithm makes is very hard to use heap in concurrent or parallel environment.  

Heap - Source: Wikipedia

Lets look at other options to achieve heap like functionality without giving up on concurrency.

Concurrent heap data structure need following properties

  • Highly concurrent ordered collection. 
  • Parallel writes/read support.
  • Top X type of API.
  • Multiple top operations supported concurrently using same instance of data structure.

Concurrent Skip list from JDK looks good candidate for this but we need to add some missing functionality.

Lets recap how Skip List data structure looks.


SkipList - Source:Wikipedia

SkipList is ordered multiple link list, it has got some fast lanes and slow lanes. Fast lanes allow to find element in approx log(n) cost.


- Unique Item identification 

JDK has set and map implementation of Skiplist. Map/Set can only have unique keys, we need to find a way to tweak unique key requirement to make set behave like Heap. 

We can use little trick, every value that is added to SkipList will have additional metadata that can running sequence number or timestamp, this extra metadata can be used for resolving conflict when 2 items are equal based on comparison.

Lets take product by sales use case for code samples. 

SalesItem is Comparable and it compares items by sales volume. 

class SalesItem implements Comparable<SalesItem> {

private final String product;
private final long sales;

@Override
public int compareTo(SalesItem o) {
return Long.compare(sales, o.sales);
}
}

We can't add SalesItem in SkipList because items having same sales volume will be rejected.

We can add another wrapper class that adds extra metadata to handle this problem. It will look something like this


class Item implements Comparable<Item> {
private final T value;
private final long index;

@Override
public int compareTo(Item o) {

int r = this.value.compareTo(o.value);
r = heapType.equals(HeapType.Max) ? -r : r;
if (r != 0) {
return r;
}
return Long.compare(index, o.index);
}

index is that extra metadata that is added to handle items with same sales volume and it case of conflict it will order by index

 - TopX API

For TopX API streams.limit can be used, another benifit of using streams APIs is that client application can use other cool features of Streams API.

Full Code for Concurrent Heap

public class ConcurrentHeap<T extends Comparable> {

private final AtomicLong id = new AtomicLong();
private final NavigableSet<Item> data = new ConcurrentSkipListSet<>();
private final HeapType heapType;

public void add(T value) {
data.add(new Item(value, id.incrementAndGet()));
}

public Stream<T> stream() {
return data
.stream()
.map(v -> v.value);
}

public Stream<T> top(int x) {
return stream().limit(x);
}

class Item implements Comparable<Item> {
private final T value;
private final long index;

@Override
public int compareTo(ConcurrentHeap<T>.Item o) {

int r = this.value.compareTo(o.value);
r = heapType.equals(HeapType.Max) ? -r : r;
if (r != 0) {
return r;
}
return Long.compare(index, o.index);
}

Item(T value, long index) {
this.value = value;
this.index = index;
}

}


Underlying data structure that is behaving like Heap is NavigableSet, JDK has 2 implementation if this first one is TreeSet and another one is ConcurrentListSkipSet. 

We can choose between TreeSet/ConcurrentListSkipSet based on need to avoid the cost of concurrency in single thread env.

 

Full working code for this blog post is available @ github