Skip to content

Cache based on futures. Idea's taken from the spray caching library, and created in java 8

Notifications You must be signed in to change notification settings

tootedom/herdcache

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

HerdCache

Overview

The cache borrows heavily from the concepts, and implementation of caching in spray-caching

The idea here being that the cache returns a future rather than a value. The value is that of a given generic type. The benefit of this approach is that it nicely takes care of thundering herds/cache miss storm issue.

This is where many requests to a particular cache key (e.g. a resource URI) arrive at the same time. I.e. a popular news story or similar. The result is you have a lot of requests to the same memcached server, at the same time, to fetch the same value. This puts huge load on that 1 memcached server, which could result in an outage. If the item is not in the cache all the requests end up hitting the upstream services that generate the cache value.

The result being that the cache key is requested ( the processing associated with calculating the value is performed ) multiple times. As a result the backend resource is put under undue load.

Returning a future from the cache means that multiple resources for a missing cache value that hasn’t been calculated, wait on the same future that is executing for the initial/first request for the cache key.

Returning a future also means that one request for a cache value from memcached can satisfy many requests.


New Features

2.0.0 sees support for RxJava, and a new implementation of the caching to support RxJava. There is a new interface:

` public interface ObservableCache<V extends Serializable> `

Sorry if you wanted RxJava support to be backwards compatible. It was just easier, and more efficient to implement the cache with RxJava support, as a completely new Interface and Implementation.

The RxJava implementation has been tested with 1.1.1 and 1.2.0. The dependency is 1.2.0. The only reason for this is that Single is no longer beta in 1.2.0 (so I can only guess there’s many many fixes between 1.1.1 and 1.2.0).

The implementations are SpyObservableMemcachedCache and ElastiCacheObservableMemcachedCache. Methods return Single<CacheItem<V>>

See details future on for information regarding RxJava support.


Breaking Changes

TL;DR If you are using defaults your keys (under which items will be stored), will be different. As a result, lots of cache misses.

In all versions < 2, the following was the default for key hashing. By Key hashing, I mean the value against which an item is ultimately stored in memcached. If you request an item to be stored under "bob", the keyHashing changes this to a fixed width string that is the hash value, i.e. "45867239"

` KeyHashingType.NATIVE_XXHASH `

This in hindesight (ah the beauty) is a stupid default. Why? key collision. Which means the two items you ask to be stored in the cache under different keys, could actually be stored under the same key. i.e. collide. The hashing algorithm with least collisions would have been the following (and much quicker on 64bit systems):

` KeyHashingType.NATIVE_XXHASH_64 `

However, you’d still have the chance of a collision. The best default is NONE. The default to prefer "correctness" over performance. The reason of chosing a hashing algorithm was to remove the "cache key checking" that the memcached client does for each key (MAX_KEY_LENGTH == 250), on any operation. With a fixed length string, the below was not required:

  public static void validateKey(final String key, final boolean binary) {
    byte[] keyBytes = KeyUtil.getKeyBytes(key);
    int keyLength = keyBytes.length;

    if (keyLength > MAX_KEY_LENGTH) {
      throw KEY_TOO_LONG_EXCEPTION;
    }

    if (keyLength == 0) {
      throw KEY_EMPTY_EXCEPTION;
    }

    if(!binary) {
      for (byte b : keyBytes) {
        if (b == ' ' || b == '\n' || b == '\r' || b == 0) {
          throw new IllegalArgumentException(
              "Key contains invalid characters:  ``" + key + "''");
        }
      }
    }

  }

However, as described we run the risk of "collisions". Meaning you could get the unexpected. A value stored under a key that you did not expect.

As a result, the default hash changed to:

KeyHashingType.NONE

The result of this is that no hashing of the key is done. The string you pass, is what the key is stored under in memcached, you need not worry about collisions.

If you want to disable the key checking that is done for each operation, as you know your keys conform. You can set the following:

.setKeyValidationType(KeyValidationType.NONE)

This disables the previously mentioned key validation. If you do have a key that doesn’t conform, you will see messages like the following in your logs:

2016-09-24 12:36:11.176 ERROR net.spy.memcached.protocol.ascii.StoreOperationImpl:  Error:  CLIENT_ERROR bad command line format

Usage

The below details how to use the caching implementation, and the various ways to talk to the cache, which fall into 3 categories:

  • GET (check if a value is in the cache)

  • SET (set a value regardless of it is exists or not)

  • APPLY (only set a value, if it doesn’t exist currently)

Each of the write methods, apply and set, allow the value to be calculated from a Supplier<V> function.

Dependency

<dependency>
  <groupId>org.greencheek.caching</groupId>
  <artifactId>herdcache</artifactId>
  <version>2.0.1</version>
</dependency>

Please note that 0.1.0 is not backwards compatible with 1.0.1. 1.0.1 extends the Cache interface to include a couple of get methods. Therefore, introduction a breaking change with the old api.

Cache Types

There are currently two main sections of interface types: Observable and NonObservable. Observable is the support for RxJava.

The observerable interface is ObservableCache<V extends Serializable> and the implementations:

  • SpyObservableMemcachedCache<V extends Serializable>

  • ElastiCacheObservableMemcachedCache<V extends Serializable>

The non-observable interface is as follows. There are currently two types of Cache interface. Cache<V> interface and the CacheWithExpiry<V> that extends upon the Cache<V>.

The two implementation’s of the CacheWithExpiry<V>.

  • SpyMemcachedCache<V>

  • ElastiCacheMemcachedCache<V>

The are a couple of implementations of the Cache<V> interface. However, these are deprecated and will not be discussed further:

  • SimpleLastRecentlyUsedCache<V>

  • ExpiringLastRecentlyUsedCache<V>


Observable

This is new as of 2.0.1, and is an implementation of herd cache using RxJava. The Single<CacheItem<V>> has been tested with version 1.1.1 and 1.2.0, and appears to be working as expected (on my machine. dot tm')

RxJava Support

The new observable cache interface ObservableCache<V extends Serializable>, is focused at RxJava support. The two implementations:

  • SpyObservableMemcachedCache<V extends Serializable>

  • ElastiCacheObservableMemcachedCache<V extends Serializable>

implement this interface, which has the methods:

  • default public Single<CacheItem<V>> apply(String key, Supplier<V> computation, Duration timeToLive) // Supplier Values always valid, and Cache values always valid

  • default public Single<CacheItem<V>> apply(String key, Supplier<V> computation, Duration timeToLive,Predicate<V> isSupplierValueCachable) // Cache values always valid

  • default Single<CacheItem<V>> apply(String key, Supplier<V> computation,Predicate<V> isSupplierValueCachable,Predicate<V> isCachedValueValid) // No TTL for cached item

  • default public Single<CacheItem<V>> set(String keyString, Supplier<V> value, Duration timeToLive) // Supplier Value is always valid

  • default public Single<CacheItem<V>> set(String keyString, V value, Duration timeToLive) // Supplier Value is always valid

  • public Single<CacheItem<V>> get(String key);

  • public Single<CacheItem<V>> apply(String key, Supplier<V> computation, Duration timeToLive,Predicate<V> isSupplierValueCachable,Predicate<V> isCachedValueValid)

  • public Single<CacheItem<V>> set(String keyString, Supplier<V> computation, Duration timeToLive,Predicate<V> canCacheValueEvaluator);

  • public Single<Boolean> clear(String key);

  • public void shutdown();

All implementations return the RxJava Observable implementation Single<CacheItem<V>>. Which means a single value is returned, or a Throwable.

The RxJava Support does not implement a "Stale With Revalidate" notion. The reason being is that this is entirely implementable/customizable on the client side.

All of the Single observables that are returned by the implementation are Cold Observables.

CacheItem

The CacheItem<V> return type is just a wrapper around your V implementation type. V is the object type that is stored/serialised to memcached.

The CacheItem<V> has a few method to make interacting with teh domain object a little less painful.

For example lets take the cache get: cache.get(string) that will eventually return CacheItem<String>, all being well.

CacheItem<String> item = cache.get("string").toBlocking().value()

The get against memcached might either return a value, or return "null". Therefore the value returned is wrapped in a Optional<String>

  • To get the Optional:

Optional<String> v = item.getValue() Optional<String> v = item.optional()

  • If you would rather just have the value and check for null:

String v = item.value()

  • If you want to return a default value, if 'null' was returned:

String v = item.value("default")

  • If you want to check if nothing was returned:

item.isEmpty()

  • If you want to check if something was returned:

item.hasValue()

  • If you want to obtain the key the item is cached under

item.getKey()

  • If you want to check if the result was from the cache

item.isFromCache()


RxJava Write To Memcache (to be aware of)

The apply and set methods, the actual write to memcached can be either synchronous or asynchronous. By default the memcached write is that of asynchronous.

  • public Single<CacheItem<V>> apply(String key, Supplier<V> computation, Duration timeToLive,Predicate<V> isSupplierValueCachable,Predicate<V> isCachedValueValid)

  • public Single<CacheItem<V>> set(String keyString, Supplier<V> computation, Duration timeToLive,Predicate<V> canCacheValueEvaluator);

It is the value of the MemcachedCacheConfigBuilder.setWaitForMemcachedSet(true|false) that determines if the writing of the value to memcached runs on the same scheduler as .subscribeOn or not.

When set to true, the memcached write will be written on the same scheduler that the .subscribeOn runs (http://reactivex.io/documentation/operators/subscribeon.html).

When it is set to false, the default is for the memcached write to run on the Schedulers.io thread pool. You can choose the pool the write operates on by setting MemcachedCacheConfigBuilder.setWaitForMemcachedSetRxScheduler(Scheduler scheduler). Setting it to schedulers.immediate() is the same as MemcachedCacheConfigBuilder.setWaitForMemcachedSet(true)

When setting MemcachedCacheConfigBuilder.setWaitForMemcachedSet(false) you need to be wary of the default timeout for the memcached write to complete in, which is 2 seconds. If you which to increase this set MemcachedCacheConfigBuilder.setWaitDuration(Duration d) to something larger. For example:

.setWaitDuration(Duration.ofSeconds(5))

What about Stale While Revalidate

The ObservableCache is returning the domain object of CacheItem. This has a isFromCache method that tells you if the apply(…​) command returned the value from cache or not. You can use this to start you "stale" cache implementation.

If you want to implement your stale caching implementation based on the "actual" cache value you cached, then is entirely possible. See the test org.greencheek.caching.herdcache.memcached.observable.TestExampleStaleWhileRevalidateObservableCache as an example based on the actual value.


Examples

Set an item in the cache

Single<CacheItem<String>> val = cache.set("Key1", () -> "SomeValue",Duration.ofSeconds(60));
// The above just creates the Observable.  Nothing has happened.  If you do not subscribe
// The item will never be set in the cache.


// Register a subscriber to have the set execute
String itemSet = val.toBlocking().value().value());

Get Or Set an item in the cache

If "Key1" already exists in the cache then it is returned. Otherwise the new value is returned, and set in the cache

Single<CacheItem<String>> val = cache.apply("Key1", () -> "NewValue",Duration.ofSeconds(60));
// The above just creates the Observable.  Nothing has happened.  If you do not subscribe
// The item will never be set in the cache.


// Register a subscriber to have the set execute
String itemSet = val.toBlocking().value().value());

Get an item from the cache

Single<CacheItem<String>> val = cache.get("Key1")
// The above just creates the Observable.  Nothing has happened.  If you do not subscribe
// The item will never be set in the cache.


// Register a subscriber to have the get execute, and return a value
String item = val.toBlocking().value().value());

Runtime Exceptions from Supplier<V>

In all of the above examples, we assume no Runtime Exceptions occur, when the value is obtained from the Supplier<V> (from set(…​) and apply(…​)). If the Supplier throws an exception they will be propagated to the Subscriber. (Exceptions from readding or writing to memcached will NOT be propagate to Subscribers).

Therefore, if you expect that an exception can be thrown from your supplier, you should pass in the appropriate onError action to the subscribe. Example:

    class MyFunkyException extends RuntimeException {

    }

    @Test
    public void testRuntimeException() {

        cache = new SpyObservableMemcachedCache<>(
                new ElastiCacheCacheConfigBuilder()
                        .setMemcachedHosts("localhost:" + memcached.getPort())
                        .setTimeToLive(Duration.ofSeconds(10))
                        .setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
                        .setWaitForMemcachedSet(true)
                        .setWaitForRemove(Duration.ofMillis(0))
                        .setKeyPrefix(Optional.of("elastic"))
                        .buildMemcachedConfig()

        );

        Single<CacheItem<String>> val = cache.set("Key1", () -> {throw new MyFunkyException();} , Duration.ofSeconds(60));

        boolean errorThrown = false;
        try {
            String item =  val.toBlocking().value().value();
        } catch (MyFunkyException e) {
            errorThrown = true;
        }

        assertTrue("should have thrown custom exception",errorThrown);

        final AtomicBoolean success = new AtomicBoolean(false);
        final AtomicBoolean failure = new AtomicBoolean(false);

        assertEquals(0, memcached.getDaemon().getCache().getCurrentItems());

        val.subscribe(
                new Action1<CacheItem<String>>() {
                    @Override
                    public void call(CacheItem<String> stringCacheItem) {
                        success.set(true);
                    }
                },
                new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        if(throwable instanceof MyFunkyException) {
                            failure.set(true);
                        }
                    }
                });

        assertFalse("should have thrown an exception",success.get());
        assertTrue("should have thrown custom exception", failure.get());
    }

subscribeOn and observeOn

If there ever was a confusing subject for RxJava, this is it. subscribeOn is where the Observable’s code will execute. observeOn is where the Subscribers code will execute.

Lets say we are running on the main Thread and we execute this:

        CountDownLatch latch = new CountDownLatch(1);
        String value = "value1";
        Single<CacheItem<String>> val = cache.apply("Key1", () -> {
            System.out.println("Supplier Value: " + Thread.currentThread().getName());
            return value;
        }, Duration.ofSeconds(60));

        val.subscribe(calculatedValue -> {
            System.out.println("subscription: " + Thread.currentThread().getName());
            latch.countDown();
        });

        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

The output would be:

Supplier Value: main
subscription: main

The above means that the execution of the following all occur on the main thread:

  • memcached lookup (get)

  • executing the Supplier<V>

  • memcached write (set)

  • Calling the Subscribers code

If we now set subscribeOn(Schedulers.io()) on the Single<CacheItem<String>> what this does is, excute the following on the IO scheduler RxIoScheduler:

  • memcached lookup (get)

  • executing the Supplier<V>

  • memcached write (set)

  • Calling the Subscribers code

        CountDownLatch latch = new CountDownLatch(1);
        String value = "value1";
        Single<CacheItem<String>> val = cache.apply("Key1", () -> {
            System.out.println("Supplier Value: " + Thread.currentThread().getName());
            return value;
        }, Duration.ofSeconds(60));
        val = val.subscribeOn(Schedulers.io());

        val.subscribe(calculatedValue -> {
            System.out.println("subscription: " + Thread.currentThread().getName());
            latch.countDown();
        });

        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

You will be able to see the following in the output.

Supplier Value: RxIoScheduler-2
subscription: RxIoScheduler-2

If we now set observeOn(Schedulers.computation()) on the Single<CacheItem<String>> what this does is, excute the following on the IO scheduler RxIoScheduler:

  • memcached lookup (get)

  • executing the Supplier<V>

  • memcached write (set)

And execute the following on the computation scheduler, RxComputationScheduler:

  • Calling the Subscribers code

        CountDownLatch latch = new CountDownLatch(1);
        String value = "value1";
        Single<CacheItem<String>> val = cache.apply("Key1", () -> {
            System.out.println("Supplier Value: " + Thread.currentThread().getName());
            return value;
        }, Duration.ofSeconds(60));
        val = val.subscribeOn(Schedulers.io());
        val = val.observeOn(Schedulers.computation());

        val.subscribe(calculatedValue -> {
            System.out.println("subscription: " + Thread.currentThread().getName());
            latch.countDown();
        });

        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

You will be able to see the following in the output.

Supplier Value: RxIoScheduler-2
subscription: RxComputationScheduler-1

Async Write to memcached

It is the value of the MemcachedCacheConfigBuilder.setWaitForMemcachedSet(true|false) that determines if the writing of the value to memcached runs on the same scheduler as .subscribeOn or not.

When set to true, the memcached write will be written on the same scheduler that the .subscribeOn runs (http://reactivex.io/documentation/operators/subscribeon.html).

When it is set to false, the default is for the memcached write to run on the Schedulers.io() thread pool. You can choose the pool the write operates on by setting MemcachedCacheConfigBuilder.setWaitForMemcachedSetRxScheduler(Scheduler scheduler). Setting it to schedulers.immediate() is the same as MemcachedCacheConfigBuilder.setWaitForMemcachedSet(true)

When setting MemcachedCacheConfigBuilder.setWaitForMemcachedSet(false) you need to be wary of the default timeout for the memcached write to complete in, which is 2 seconds. If you which to increase this set MemcachedCacheConfigBuilder.setWaitDuration(Duration d) to something larger. For example:

.setWaitDuration(Duration.ofSeconds(5))

Non Observable

This is using traditional futures, and the original implementation of herdcache.

Cache Interface

The cache interface that the beginning of its life, used to have a single method apply that took:

That method was: ListenableFuture<V> apply(String key, Supplier<V> computation, ListeningExecutorService executorService)

The returned value is that of Guava’s ListenableFuture, upon which you can attach a callback, or wait for a value to be generated:

The cache has now been extended to have more methods, as well as the introduction of a second interface CacheWithExpiry<V>. You will probably most likely work with the CacheWithExpiry interface.

As mentioned there are 3 types of methods on the interfaces: get, set, apply Both method types, get and apply, lookup a value in the cache that is associated with a key. The difference between the get and the apply, is that the apply can generate the value, whilst the get only looks up in the cache. The set on the other hand only ever sets a value in the cache.

Both get methods lookup a cache value, always returning a Guava’s ListenableFuture

The below shows a couple of examples of working with the returned ListenableFuture.

  • Adding a callback:

// Executes on the calling thread
Futures.addCallback(future,new FutureCallback<String>() {
                        @Override
                        public void onSuccess(String result) {

                        }

                        @Override
                        public void onFailure(Throwable t) {

                        }
                   });


// Executes on the passing in executor thread pool
private final ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));

Futures.addCallback(val,new FutureCallback<String>() {
            @Override
            public void onSuccess(String result) {

            }

            @Override
            public void onFailure(Throwable t) {

            }
},executorService);
  • Waiting for the value (or failure)

        try {
            future.get();
        } catch (InterruptedException e) {

        } catch (ExecutionException e) {
            // Any exception that occurred in the Supplier will be the .getCause()
        }

CacheWithExpiry Interface

More likely than not, you will be interacting with this interface. This interface extends upon the Cache<V> interface to allow you to provide method level durations for items stored in the cache.

The list of available methods are:

  • public ListenableFuture<V> apply(String key, Supplier<V> computation)

  • public ListenableFuture<V> apply(String key, Supplier<V> computation, ListeningExecutorService executorService)

  • public ListenableFuture<V> apply(String key, Supplier<V> computation, ListeningExecutorService executorService,Predicate<V> canCacheValueEvalutor)

  • public ListenableFuture<V> apply(String key, Supplier<V> computation, ListeningExecutorService executorService,Predicate<V> canCacheValueEvalutor,Predicate<V> isCachedValueUsable);

  • public ListenableFuture<V> apply(String key, Supplier<V> computation, Duration timeToLive, ListeningExecutorService executorService)

  • public ListenableFuture<V> apply(String key, Supplier<V> computation, Duration timeToLive, ListeningExecutorService executorService, Predicate<V> isSupplierValueCachable)

  • public ListenableFuture<V> apply(String key, Supplier<V> computation, ListeningExecutorService executorService, Predicate<V> isSupplierValueCachable,Predicate<V> isCachedValueValid)

  • public ListenableFuture<V> apply(String key, Supplier<V> computation, Duration timeToLive, ListeningExecutorService executorService, Predicate<V> isSupplierValueCachable,Predicate<V> isCachedValueValid)

  • public ListenableFuture<V> get(String key)

  • public ListenableFuture<V> get(String key,ListeningExecutorService executorService)

  • public ListenableFuture<V> set(String keyString, V value)

  • public ListenableFuture<V> set(String keyString, Supplier<V> value)

  • public ListenableFuture<V> set(String keyString, V value, ListeningExecutorService executorService)

  • public ListenableFuture<V> set(String keyString, Supplier<V> value, ListeningExecutorService executorService)

  • public ListenableFuture<V> set(String keyString, Supplier<V> value, Predicate<V> canCacheValueEvalutor, ListeningExecutorService executorService)

  • public ListenableFuture<V> set(String keyString, Supplier<V> computation, Duration timeToLive,Predicate<V> canCacheValueEvaluator,ListeningExecutorService executorService)

  • public ListenableFuture<V> set(String keyString, Supplier<V> value, Duration timeToLive)

  • public ListenableFuture<V> set(String keyString, V value, Duration timeToLive)

  • public ListenableFuture<V> set(String keyString, V value, Duration timeToLive, ListeningExecutorService executorService)

  • public ListenableFuture<V> set(String keyString, Supplier<V> value, Duration timeToLive, ListeningExecutorService executorService)

The apply(…​) method returns a Future that wraps both the lookup for the cache value in memcached and if no value exists in memcached, the generation of the value from the Supplier<V>

Waiting on futures

The Cache<V> interface inherits a Utility interface (AwaitOnFuture<V>) that gives you a couple of utility methods that allow you to wait on futures, for a value to be calculated

  • V awaitForFutureOrElse(ListenableFuture<V> future, V onExceptionValue)

  • V awaitForFutureOrElse(ListenableFuture<V> future, V onExceptionValue, V onTimeoutValue, long duration, TimeUnit timeUnit)

Wait on future, with fallback value incase of exception

A the value returned back from a cache apply is that of a ListenableFuture. You can naturally wait on the currently executing thread (blocking that thread), for a value to be returned. This is as follows:

try {
   return future.get();
} catch (Exception e) {
   return somefallback;
}

The method V awaitForFutureOrElse(ListenableFuture<V> future, V onExceptionValue), remove the ceremony of the try/catch block for you.

The other method V awaitForFutureOrElse(ListenableFuture<V> future, V onExceptionValue, V onTimeoutValue, long duration, TimeUnit timeUnit) allows you wait a finite amount of time for a value to be returned. The amount of time elapsed, the onTimeoutValue is going to be returned. Any other exception results in the onExceptionValue being thrown.


CacheWithExpiry

There are two implementations of the CacheWithExpiry<V> interface:

  • SpyMemcachedCache<V>

  • ElastiCacheMemcachedCache<V>

The second implementation ElastiCacheMemcachedCache<V> is an extension of the SpyMemcachedCache<V> implementation for working with Amazon AWS’s memcached support (known as ElastiCache).

The CacheWithExpiry<V> interface differs from that of the Cache<V>, by having Duration element as part of the cache method. This allows you to specify the duration (length of time) that the item lives in the cache.


Defaults

Both the following cache classes use the following defaults.

The ElastiCacheCacheConfigBuilder extends the abstract class MemcachedCacheConfigBuilder which contains the defaults for which the SpyMemcachedCache<V> will execute. The builder allows you to override the defaults:

The following defaults are for both memcached and ElastiCache memcached

Method Default Description

setTimeToLive

Duration.ofSeconds(60);

The default expiry time an item with be given if not specified

setMaxCapacity

1000;

Max number of futures to internal cache whilst a value is being calculated. This is NOT the max number of items cachable in memcached

setMemcachedHosts

"localhost:11211";

Comma separated host list

setHashingType

ConnectionFactoryBuilder.Locator.CONSISTENT;

Using consistent hashing, don’t change

setFailureMode

FailureMode.Redistribute;

When an error occurs, what should occur (FailureMode.Retry may suit you better for this)

setHashAlgorithm

DefaultHashAlgorithm.KETAMA_HASH;

Type of consistent hashing to be used for calculating the memcached node to talk to, don’t change

serializingTranscoder

new FastSerializingTranscoder();

The type of serializer to be used. Class responsbile for serialising java objects to a byte stream to store in memcached

protocol

ConnectionFactoryBuilder.Protocol.BINARY;

the protocol used for talking to memcached

readBufferSize

DefaultConnectionFactory.DEFAULT_READ_BUFFER_SIZE;

default socket buffer size when talking to memcached, do not change

memcachedGetTimeout

Duration.ofMillis(2500);

when looking in memcached for a matching key, this is the amount of time to wait before timing out

dnsConnectionTimeout

Duration.ofSeconds(3);

When resolving the memcachedHosts to ip addresses, the amount of time to wait for dns lookup, before ignoring that node

waitForMemcachedSet

false

Wait for the write to memcached to occur before removing future from internal cache

setWaitDuration

Duration.ofSeconds(2);

amount of time to wait for the memcached set

keyHashType

KeyHashingType.NATIVE_XXHASH;

how the cache key is hashed. The key is not stored verbatim in memcache and is hash to a number first. This is the hashing algorithm used.

keyPrefix

Optional.empty()

should the key used in lookup, be prefixed with a string to avoid the unlikely event of a key claash.

asciiOnlyKeys

false;

we only have ascii keys that will be stored in the cache

hostStringParser

new CommaSeparatedHostAndPortStringParser();

do not change

hostResolver

new AddressByNameHostResolver();

do not change

useStaleCache

false;

Whether stale caching is enabled

staleCacheAdditionalTimeToLive

Duration.ZERO;

The amount of time extra that items will be stored in the stale cached

staleCachePrefix

"stale";

The prefix for stale keys, to avoid clash

staleMaxCapacity

-1;

The size of the cache for futures for the stale cache is the same as the maxCapacity if -1

staleCacheMemachedGetTimeout

Duration.ZERO

Time to wait for lookups against the stale cache

removeFutureFromInternalCacheBeforeSettingValue

false;

When the Supplier<V> computation is completed the future is set with the computed value, and removed from the internal cache. This is whether (if false) specifies that we set the future to complete, before removal of the internal future cache. Or (true), remove the future from map first and then set the future value

metricRecorder

no metric recorder

Can take a new YammerMetricsRecorder that will record metrics in a CodeHale Metric Registry

compressionAlgorithm

SNAPPY

The type of compression algorithm to use when values are stored in memcached. LZ4 is the quickest implementation

herdProtectionEnabled

true

If you which to turn off herd cache protection

setKeyValidationType

BY_HASHING_TYPE

If you which to turn off validation of your keys, as you know they conform (KeyValidationType.NONE)

The following default apply just to that of ElastiCache memcached

Method

Default

Description

setElastiCacheConfigHosts

"localhost:11211";

The memcached elasticache config host name i.e. yourcluster.jgkygp.0001.euw1.cache.amazonaws.com:11211

setConfigPollingTime

Duration.ofSeconds(60);

The frequency by which to contact the config host for potential updates to the memcached nodes

setInitialConfigPollingDelay

Duration.ZERO;

The time for the initial poll to the config host to obtain the memcached nodes

setConnectionTimeoutInMillis

Duration.ofMillis(3000);

The time for establishing a connection to the config host before stopping and retrying

setIdleReadTimeout

Duration.ofSeconds(125);

If the client does also receive any data from the ElastiCache Configuration Endpoint, a reconnection will be made; this idle period is controlled by the setting idleReadTimeout.

setReconnectDelay

Duration.ofSeconds(5);

The delay between performing a reconnection attempt to the config host

setDelayBeforeClientClose

Duration.ofSeconds(300);

When the ElastiCache Configuration Endpoint, outputs a configuration update a new spy memcached client is created, and the old client is closed. There a delay before the old client is closed, as it may still be in use

setNumberOfConsecutiveInvalidConfigurationsBeforeReconnect

3

If the config host returns invalid config this number of times in a row, a reconnection will be made

setUpdateConfigVersionOnDnsTimeout

true;

Set to false, if you don’t want to acknowledge a config update, if a dns resolution for any of the memcached nodes failed

setMemcachedClientType

SPY

Default memcached client is that of SPY. Folsom is available, but not fully tested, therefore do not change

setUseFolsomStringClient

false

If we are just storing string. Folsom specific (do not use)

setFolsomCharset

UTF-8

do not use

setFolsomConnections

10

do not use

setFolsomRequestTimeout 3000

do not use

setFolsomMaxOutstandingRequests


Herd Protection

The SpyMemcachedCache<V> or ElastiCacheMemcachedCache<V> thundering herd protection is made available by the use of an internal cache. The get and apply methods make use of this internal cache. It is this internal cache that has a finite size that is specified by setMaxCapacity. When the get or apply methods are called, this cache is checked for an existing ListenableFuture<V>. If one exists this means a previous execution of get or apply is running that is either fetching the value from memcached or is pending on the the Supplier<V> to generate the value.

If an existing ListenableFuture<V> is available in the internal cache this is returned to the user. If one does not exist a new ListenableFuture<V> will be create and returned.

Disable Herd Protection

It might be the case that you do not need the herd protection (you have a long tail where no one key is hot). And instead you want to use the Future, Supplier and Predicate functionality of herdcache, withou the 'herd'. This can be done, by setting herdProtectionEnabled to false. This replaces the internal cache with a NoOp cache.

SpyMemcachedCache

The SpyMemcachedCache<V> implementation uses the spy memcached java library to communicate with memcached. The implementation is similar to that of SimpleLastRecentlyUsedCache in that it uses a ConcurrentLinkedHashMap to store the cache key against an executing future.

When two requests come for the same key, the future is stored in an internal ConcurrentLinkedHashMap:

store.putIfAbsent(keyString, future)

If a subsequent request comes in for the same key, and the future has not completed yet, the existing future in the ConcurrentLinkedHashMap is returned to the caller. This way two requests wait on the same executing Supplier<V> computation

When constructing the SpyMemcachedCache, you can specify the max size of the internal ConcurrentLinkedHash that is used to store the concurrently executing futures.

Unlike the SimpleLastRecentlyUsedCache implementation, that stores the Completed futures in the ConcurrentLinkedHash for subsequent cache hits to obtain the completed future’s value, the SpyMemcachedCache<V> cache removes the key and associated future from the internal ConcurrentLinkedHash. The value of the completed future is instead stored in memcached for subsequent retrieval.

Before the Supplier<V> computation is submitted to the passed executor for execution, the memcached cluster is checked for the existance of a value for the given key. If a value is present in memcached, the returned future will be set with the obtained value. This means that if two request comes in for the same key, for which a value is present in memcached they will wait on the same future to have it’s value set to that of the memcached cache hit.

If a value does not exist in the memcached, then the given Supplier<V> computation is submitted to the provided executor, for execution. Once the value has been calculated, it is sent over the network to memcached for storage.

With this library the value is stored asynchronously in memcached, and the future completed with the computed value and sub-sequentially the future is removed from the ConcurrentLinkedHashMap. Therefore, there is a slim time period, between the completion of the future and the value being saved in memcached. This means a subsequent request for the same key could be a cache miss.

It is possible when constructing the SpyMemcachedCache to specify to a period of time (i.e. make the asynchronous set into memcached call semi synchronous) to wait for the set to occur.

The SpyMemcachedCache is created by passing a MemcachedCacheConfig. A MemcachedCacheConfig is created via that of a ElastiCacheCacheConfigBuilder that contains the method public MemcachedCacheConfig buildMemcachedConfig() that build the CacheConfig for both the ElastiCacheMemcachedCache and the SpyMemcachedCache

The following show various ways of configuring the cache:


Constructing the SpyMemcachedCache

        cache = new SpyMemcachedCache<>(
                new ElastiCacheCacheConfigBuilder()
                        .setMemcachedHosts("localhost:11211")
                        .setTimeToLive(Duration.ofSeconds(60))
                        .setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
                        .buildMemcachedConfig()
        );

        ListenableFuture<String> val = cache.apply("Key1", () -> {
            return "value1";
        }, Duration.ofSeconds(3), executorService);

        assertEquals("Value should be key1","value1", cache.awaitForFutureOrElse(val null));

Specifying the Memcached hosts

By default the host string is localhost:11211, however, you can specify a number of hosts to connect to by specifying them as a comma separated string in the Builder:

         CacheWithExpiry<String> cache = new SpyMemcachedCache<>(
                 new ElastiCacheCacheConfigBuilder()
                         .setMemcachedHosts("localhost:11211,localhost:11212,localhost:11213"))
                         .buildMemcachedConfig()
                 );

When the SpyMemcachedCache is passed the list of memcached hosts, the ip address for host needs to be resolved. By default 3 seconds, per host, is waited for to obtain the ip address. This can be controlled, like as follows:

         CacheWithExpiry<String> cache = new SpyMemcachedCache<>(
                 new ElastiCacheCacheConfigBuilder()
                         .setMemcachedHosts("localhost:11211,localhost:11212,localhost:11213"))
                         .setDnsConnectionTimeout(Duration.ofSeconds(2))
                         .buildMemcachedConfig()
                 );

Specifying the Expiry of Items in memcached

There are two ways to specify the Expiry of items that are stored in memcached:

  • A global Time To Live for the items

  • Passing the Time To Live for cached item in the apply method

The below for example will set a default of 30 seconds for all items saved in the cache, for which a TimeToLive has not been specified:

ListenableFuture<String> val = cache.apply("Key1", () → {return "value1";}, executorService);

         CacheWithExpiry<String> cache = new SpyMemcachedCache<>(
                 new ElastiCacheCacheConfigBuilder()
                         .setMemcachedHosts("localhost:11211"))
                         .setTimeToLive(Duration.ofSeconds(30))
                         .buildMemcachedConfig()
                 );

To specify the TTL on a per time basis, specify the Duration when calling the apply method:

ListenableFuture<String> val = cache.apply("Key1", () → {return "value1";}, Duration.ofSeconds(10), executorService);


Setting Wait for memcached Set

When an item is not in the cache, or currently being calculated; the cache will execute the Supplier<V> computation, and store the returned value in memcached. A future has been created and stored in the internal future calculation cache, so that any requests for the same key, wait on the completion of the same future.

With this library the computed cache value is stored asynchronously in memcached, and the future completed with the same value. The future is completed, and removed from the internal future calculation cache ( ConcurrentLinkedHashMap ). Therefore, there is a slim time period, between the completion of the future and the value being saved in memcached. This means a subsequent request for the same key could be a cache miss.

As a result, you can request that the write to memcached be synchronous and a finite period be waited for, for the write to take place. This is done a constructor time, as shown in the following which waits a max of 3 seconds for the set to occur.

         cache = new SpyMemcachedCache<>(
                 new ElastiCacheCacheConfigBuilder()
                         .setMemcachedHosts("localhost:11211"))
                         .setTimeToLive(Duration.ofSeconds(60))
                         .setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
                         .setWaitForMemcachedSet(true)
                         .setSetWaitDuration(Duration.ofSeconds(3))
                         .buildMemcachedConfig()
         );

         ListenableFuture<String> val = cache.apply("Key1", () -> {
             return "value1";
         }, Duration.ofSeconds(3), executorService);

         assertEquals("Value should be key1","value1", cache.awaitForFutureOrElse(val null));

No Expiry

Items in the cache can have no expiry TTL apply by specifying the duration as ZERO

         CacheWithExpiry<String> cache = new SpyMemcachedCache<>(
                 new ElastiCacheCacheConfigBuilder()
                         .setMemcachedHosts("localhost:11211"))
                         .setTimeToLive(Duration.ofSeconds(60))
                         .setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
                         .setWaitForMemcachedSet(true)
                         .setSetWaitDuration(Duration.ofSeconds(3))
                         .buildMemcachedConfig()
         );

         ListenableFuture<String> val = cache.apply("Key1", () -> {return "value1";}, Duration.ZERO, executorService);

         assertEquals("Value should be key1","value1", cache.awaitForFutureOrElse(val null));

Cache Key

The cache key has to be a string. Memcached has a requirement for makeup of keys, when using the TEXT protocol, such that your key object must conform to the following requirements.

  • Needs to be a string

  • cannot contain ' '(space), '\r'(return), '\n'(linefeed)

If you are using the BINARY protocol these requirements do not apply. However, you may wish to perform hashing of the string representing the key to allow for any character to be used. The cache has the ability for a couple of hash representations of the key:

  • NONE,

  • NATIVE_XXHASH,

  • JAVA_XXHASH,

  • MD5_UPPER,

  • SHA256_UPPER,

  • MD5_LOWER,

  • SHA256_LOWER

To use either of these you need to specify the hashing method to be used at cache construction time. For the best performance, XXHash is recommended:

cache = new SpyMemcachedCache<>(
                new ElastiCacheCacheConfigBuilder()
                        .setMemcachedHosts("localhost:" + memcached.getPort())
                        .setTimeToLive(Duration.ofSeconds(60))
                        .setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
                        .setWaitForMemcachedSet(true)
                        .setKeyHashType(KeyHashingType.MD5_LOWER)
                        .buildMemcachedConfig()
        );

Cache Key Prefix

When hashing a key, there is a potential for two different Strings to actually end up with the same Hashed value. As a result you can add a cache prefix to the cache at construction.

The below specifies a cache prefix of article. This will be prepended to the hashed cache key, the method setHashKeyPrefix(false) means that the prefix will be added after the cache key has be hashed. setting setHashKeyPrefix(true) to true means that the prefix will be prepended to the cache key, and then the hashing will take place. This is the default, as the prefix has the potential to break the TEXT protocol key requirements (Hashing the key makes sure this does not occur).

  cache = new SpyMemcachedCache<>(
                new ElastiCacheCacheConfigBuilder()
                        .setMemcachedHosts("localhost:" + memcached.getPort())
                        .setTimeToLive(Duration.ofSeconds(60))
                        .setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
                        .setWaitForMemcachedSet(true)
                        .setKeyHashType(KeyHashingType.MD5_LOWER)
                        .setKeyPrefix(Optional.of("article"))
                        .setHashKeyPrefix(false)
                        .buildMemcachedConfig()
        );

Chosing Not To Cache

Since 1.0.6 the client (Cache<V>) has the following method:

    public ListenableFuture<V> apply(String key, Supplier<V> computation, ListeningExecutorService executorService,
                                     Predicate<V> canCacheValueEvalutor);

And The CacheWithExpiry<V> contains the method:

    public ListenableFuture<V> apply(String key, Supplier<V> computation, Duration timeToLive,
                                     ListeningExecutorService executorService,Predicate<V> canCacheValueEvalutor);

These methods allow you to pass a Predicate<V> that you can use to evaluate whether the value returned from the Supplier<V> (the function generating the value to cache), should actually be stored in memcached, etc. This can be useful in situtations where the Supplier<V> is lets say a HystrixCommand object, how value has on this occasion been generated by it’s fallback. The Predicate<V> could wrap the command object an evaluate if the value was from the fallback and choose not to cache:

    apply("webservicecallx",() -> command.execute(),
          (cachevalue) -> {
                    return !command.isResponseFromFallback();
          }
         )

Stale Caching

Since 1.0.1 the client supports a stale caching mechanism; this by default is not enabled as it requires an additional future (via composition) to perform the additional cache lookup. It is also an addition lookup on the memcached server, and also will use x2 the memory (items are stored twice in the cache). Enabling the stale caching feature is done via the .setUseStaleCache(true) method.

The stale caching function is a mini "stale-while-revalidate" mechanism. Without the stale caching enabled, when an item expires in the cache, which is popular; then a lot of requests will be waiting on the cache item to be regenerated from the backend. This means you can have a spike in a larger than you would like requests.

With stale caching enabled, only one request will regenerate the item from the backend cache. The other requests will use a stale cache. The stale cached is ONLY checked if a future exists in the internal cache, meaning that a backend request is in operation to calculate the cache item

With stale caching enabled when an item is stored in memcached, it is stored twice. The 2nd time it is stored under a different key. This key is made up of the hashed cache key, and the stale cache key prefix set via the constructor method .setStaleCachePrefix("staleprefix"). The default value is that of stale.

The item is stored, by default for setTimeToLive longer than the original cache item. To provide a value of your own, say 10 minutes extra, you can specify this at construction time:

        cache = new SpyMemcachedCache<>(
                new ElastiCacheCacheConfigBuilder()
                        .setMemcachedHosts("localhost:" + memcached.getPort())
                        .setTimeToLive(Duration.ofSeconds(1))
                        .setUseStaleCache(true)
                        .setStaleCacheAdditionalTimeToLive(Duration.ofMinutes(10))
                        .setStaleCachePrefix("staleprefix")
                        .setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
                        .setWaitForMemcachedSet(true)
                        .buildMemcachedConfig()
        );

Stale Caching is available in both SpyMemcachedCache and ElastiCacheMemcachedCache


Metrics

Metric are available in both SpyMemcachedCache and ElastiCacheMemcachedCache as of version 1.0.11. The configuration builder takes an option .setMetricsRecorder(..)' This takes an implementation of org.greencheek.caching.herdcache.memcached.metrics.MetricsRecorder. The default implementation being a `NoOpMetricRecorder. The other implementation is that of the new YammerMetricsRecorder(registry) which uses the yammer metrics library (https://dropwizard.github.io/metrics).

With the YammerMetricsRecorder the following metrics are placed inside the Metrics library:

Method Description

value_calculation_cache_hitrate

The cache hits per second on the internal future cache

value_calculation_cache_missrate

The cache misses per second on the internal future cache

value_calculation_cache_hitcount

The cache hits in total on the internal future cache

value_calculation_cache_misscount

The cache misses in total on the internal future cache

value_calculation_success_count

The number of successful runs of the Supplier<T> function

value_calculation_failure_count

The number of failed runs of the Supplier<T> function

value_calculation_time_timer

The time it has taken to execute the Supplier<T> function

distributed_cache_hitrate

The cache hits per second on the distributed cache (i.e. memcached)

distributed_cache_missrate

The cache misses per second on the distributed cache (i.e. memcached)

distributed_cache_timer

The time it takes to lookup a value from the distributed cache

distributed_cache_count

The number of lookups in the distributed cache that have been performed

distributed_cache_hitcount

The cache hits in total on the distributed cache

distributed_cache_misscount

The cache misses in total on the distributed cache

distributed_cache_writes_count

The writes performed on the distributed cache

stale_distributed_cache_timer

The time it takes to lookup a stale value from the distributed cache

stale_distributed_cache_hitrate

The stale cache hits per second on the distributed cache (i.e. memcached)

stale_distributed_cache_missrate

The stale cache misses per second on the distributed cache (i.e. memcached)

stale_distributed_cache_count

The stale hits performed on the distributed cache (i.e. memcached)

stale_distributed_cache_hitcount

The stale cache hits in total on the distributed cache

stale_distributed_cache_misscount

The stale cache misses in total on the distributed cache

stale_value_calculation_cache_misscount

The cache misses in total on the internal future cache for a stale value

stale_value_calculation_cache_hitcount

The cache hits in total on the internal future cache for a stale value

stale_value_calculation_cache_missrate

The cache misses per second on the internal future cache for stale value

stale_value_calculation_cache_hitrate

The cache hits per second on the internal future cache for stale value


ElastiCacheMemcachedCache (AWS ElastiCache Support)

Since release 1.0.1 there has been support AWS’s ElasticCache memcached cluster:

This is done by creating an instance of ElastiCacheMemcachedCache<V> rather than SpyMemcachedCache<V>. An example is as follows:

CacheWithExpiry<String> cache = new ElastiCacheMemcachedCache<String>(
                    new ElastiCacheCacheConfigBuilder()
                            .setElastiCacheConfigHosts("yourcluster.jgkygp.0001.euw1.cache.amazonaws.com:11211")
                            .setConfigPollingTime(Duration.ofSeconds(10))
                            .setInitialConfigPollingDelay(Duration.ofSeconds(0))
                            .setTimeToLive(Duration.ofSeconds(2))
                            .setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
                            .setWaitForMemcachedSet(true)
                            .setDelayBeforeClientClose(Duration.ofSeconds(1))
                            .setDnsConnectionTimeout(Duration.ofSeconds(2))
                            .setUseStaleCache(true)
                            .setStaleCacheAdditionalTimeToLive(Duration.ofSeconds(4))
                            .setRemoveFutureFromInternalCacheBeforeSettingValue(true)
                            .buildElastiCacheMemcachedConfig()
            );

Setting ElastiCache Hosts

The ElastiCache cache works by using the auto discovery mechanism as described here, through that of a ElastiCache Configuration Endpoint, which is described here:

You supply to the ElastiCacheMemcachedCache<V> cache the url of the ElastiCache Configuration Endpoint. The ElastiCache cache uses the netty library (http://netty.io/) to periodically send the config get cluster command to the ElastiCache Configuration Endpoint. The ElastiCache keeps a persistent connection open to the ElastiCache Configuration Endpoint, sending the command periodically. The ElastiCache Configuration Endpoint returns a configuration similar to the following, that details the actually memcached instances that should be connected to:

    CONFIG cluster 0 147
    12
    myCluster.pc4ldq.0001.use1.cache.amazonaws.com|10.82.235.120|11211 myCluster.pc4ldq.0002.use1.cache.amazonaws.com|10.80.249.27|11211

    END

When the version number (the second line) increases a new spy memcached instance is created, and the old spy memcached instance is scheduled for being closed. The ElastiCacheMemcachedCache<V> continuously polls the ElastiCache Configuration Endpoint, for any changes in the number of memcached hosts, or the hosts that are available.

The ElastiCache Configuration Endpoint is specified via the setElastiCacheConfigHosts(String config) method on the ElastiCacheCacheConfigBuilder object:

CacheWithExpiry<String> cache = new ElastiCacheMemcachedCache<String>(
                    new ElastiCacheCacheConfigBuilder()
                            .setElastiCacheConfigHosts("yourcluster.jgkygp.0001.euw1.cache.amazonaws.com:11211")

For the moment you should only specify 1 configuration host. Currently a cache cluster is only in one Availability Zone. A cluster cannot at the moment in AWS span multiple Availability Zones. You can have 3 separate elasticache clusters, one in each availability zone, but the cache will only connect to 1 availability zone at any one time.


Specifying the polling time

By default the ElastiCache cache polls the ElastiCache Configuration Endpoint for an update to the nodes that make up the cluster every 60 seconds. This can be configured via the following methods:

  • .setConfigPollingTime(Duration.ofSeconds(10))

  • .setInitialConfigPollingDelay(Duration.ofSeconds(0))

This can be seen in the following example:

private static final CacheWithExpiry cache = new ElastiCacheMemcachedCache<Integer>(
            new ElastiCacheCacheConfigBuilder()
            .setElastiCacheConfigHosts(System.getProperty("hosts","localhost:11211"))
            .setConfigPollingTime(Duration.ofSeconds(Integer.getInteger("pollingTime",60)))
            .setInitialConfigPollingDelay(Duration.ofSeconds(0))
            .setTimeToLive(Duration.ofSeconds(10))
            .setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
            .setWaitForMemcachedSet(true)
            .setDelayBeforeClientClose(Duration.ofSeconds(1))
            .setDnsConnectionTimeout(Duration.ofSeconds(2))
            .setUseStaleCache(true)
            .setStaleCacheAdditionalTimeToLive(Duration.ofSeconds(4))
            .setRemoveFutureFromInternalCacheBeforeSettingValue(true)
            .buildElastiCacheMemcachedConfig());

Persistent Connection to ElastiCache Configuration Endpoint

The ElastiCache uses a persistent connection to the ElastiCache Configuration Endpoint. If the connection is lost, the client will automatically reconnect. The client will wait for a period (default 5 seconds) before reconnecting. This can be changed by specifying the delay via the method: .setReconnectDelay(Duration.ofSeconds(10)).

If the client does also receive any data from the ElastiCache Configuration Endpoint, a reconnection will be made; this idle period is controlled by the setting idleReadTimeout. This is set to 125 seconds by default. If you modify this setting you SHOULD NOT set it lower that the polling duration; as you will just end up in the persistent connection not being persistent.

.setReconnectDelay(Duration.ofSeconds(5)) .setIdleReadTimeout(Duration.ofSeconds(125))

If the ElastiCache Configuration Endpoint is in some way returning invalid configurations, then the client will reconnect to the Configuration Endpoint. By default it takes 3 consecutive invalid configurations before the client will reconnect. This is controlled by the method: setNumberOfConsecutiveInvalidConfigurationsBeforeReconnect(int number)

Cluster nodes update and closing old SpyMemcached client

When the ElastiCache Configuration Endpoint, outputs a configuration update a new spy memcached client is created, and the old client is closed. There a delay before the old client is closed, as it may still be in use (i.e. network requests may still be executing). By default the delay is 10 second; this can be change by specifying the following config builder method .setDelayBeforeClientClose(Duration.ofSeconds(1))

ElastiCache Configuration Endpoint timeout

By default the client will wait for 3 seconds for a connection to the ElastiCache Configuration Endpoint. This can be changed by the following following config builder method .setConnectionTimeoutInMillis(Duration.ofSeconds(2))

Host lookup

When the ElastiCache Configuration Endpoint returns the configuration information it returns the hostname, and it may send with it the IP address.

    CONFIG cluster 0 147
    12
    myCluster.pc4ldq.0001.use1.cache.amazonaws.com|10.82.235.120|11211 myCluster.pc4ldq.0002.use1.cache.amazonaws.com|10.80.249.27|11211

    END

If the IP address is not returned the client will perform a DNS lookup on the hostname. By default the timeout is 3 seconds. This can be changed with the builder method .setDnsConnectionTimeout(Duration.ofSeconds(2))

If a DNS lookup fails, due to connection timeout (or a temporary network issue), or otherwise. By default that host will be excluded from the list of memcached hosts the cache client will be connected to. As a result, this will not change unless you update the cluster configuration and a new version of the config is supplied by the ElastiCache Configuration Endpoint.

A builder configuration property .setUpdateConfigVersionOnDnsTimeout(true) allows you to change this behaviour when a host is not resolved to an IP. The resolution of a host’s dns may be a temporary issue, and on the next polling config the dns will be resolvable. If you set the builder property to false .setUpdateConfigVersionOnDnsTimeout(false)

Then the memcached client will be updated to point to the hosts mentioned in the config; but if any host resolution fails; the client will not record the current configuration’s version number. Meaning on the next poll for the current cluster configuration, the memcached client will again be recreated, to point to the hosts specified in configuration.

Note if the dns resolution is constantly failing then client memcached client will constantly be re-created each time the configuration polling occurs. No validation of the previously resolved hosts, and the current resolved hosts is performed. The client will just be recreated.


ElastiCache Configuration Url Endpoint update

This feature is available in 1.0.9 and above.

As previous discussed above, when you create an ElastiCache cache you provide the url to the configuration endpoint:

CacheWithExpiry<String> cache = new ElastiCacheMemcachedCache<String>(
                    new ElastiCacheCacheConfigBuilder()
                            .setElastiCacheConfigHosts("yourcluster.jgkygp.0001.euw1.cache.amazonaws.com:11211")

It is possible that you might wish to create another cluster, with a different machine type, and switch the ElastiCache cache over at runtime to the new cluster. For example, you are moving the cache types to faster cpu type machines.

It is possible to do this by constructing a SimpleVolatileBasedElastiCacheConfigServerUpdater and passing that to the builder:

ElastiCacheConfigServerUpdater configUrlUpdater = new SimpleVolatileBasedElastiCacheConfigServerUpdater()

CacheWithExpiry<String> cache = new ElastiCacheMemcachedCache<String>(
                    new ElastiCacheCacheConfigBuilder()
                            .setElastiCacheConfigHosts("yourcluster.jgkygp.0001.euw1.cache.amazonaws.com:11211")
                            .setConfigUrlUpdater(configUrlUpdater)
                            .buildElastiCacheMemcachedConfig())

You would then code a JMX hook, or admin REST endpoint in your application that called the method: connectionUpdated(String url) to inform the cache that the configuration url has changed, and that it should connect to the new url to obtain the new list of cache cluster nodes.

configUrlUpdater.connectionUpdated("yourcluster.irujgk.0001.euw1.cache.amazonaws.com:11211")

The cache will connect to the new config cluster endpoint and obtain the cache cluster nodes. The cache will wait for setReconnectDelay(Duration.ofSeconds(5)) (5 seconds is the default), before attempting the connection to the new cluster config endpoint. You can reduce or increase this timeout.


ElastiCache Chosing Not to use a Cached Value Predicate

This feature is available in 1.0.15 and above.

This feature allows you to choose whether a cache value should be used or not. An example here would be:

  • You are storing a serialised object with an internal TTL.

  • You store the object in memcached (elasticache), with a 0 TTL (never expire)

  • Herd cache apply(…​) method is used within a Hystrix command execution to either obtain an item from cache, or calculate from backend

  • The Cache value is only used in the Hystrix command execute if the item is Fresh enough (A Predicate<V> is provided to check the TTL)

  • The backend service is currently dead, so the Hystrix command fallback is executed

  • The hystrix command fallback returns the stale item from cache by calling herdcache get(..) method

import java.io.Serializable;

public class Content implements Serializable {
    private static final long serialVersionUID = 1999L;


    private final long creationDateEpoch;
    private final String content;

    public Content(String content) {
        this.creationDateEpoch = System.currentTimeMillis();
        this.content = content;
    }

    public String getContent() {
        return content;
    }

    public long getCreationDateEpoch() {
        return creationDateEpoch;
    }
}

....
....

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandProperties;
import com.netflix.hystrix.HystrixThreadPoolProperties;
import org.greencheek.caching.herdcache.CacheWithExpiry;

import java.util.concurrent.Future;
import java.util.function.Predicate;

public class BackEndRequest extends HystrixCommand<Content> {

    // Static fallback
    private static final Content FALLBACK = new Content("{}");

    // If the value returned by the computation is the FALLBACK, do not cache
    Predicate<Content> backendComputationValueCachable  = (Content value) -> value != FALLBACK;


    // If returned cached value is older than 500ms, do not use the value.  Instead re-calculate it, by calling
    // the backend command
    Predicate<Content> cachedValueAllowed  = (Content value) -> value.getCreationDateEpoch() + System.currentTimeMillis() < 500;

    private final String key;
    private final HttpRestClient client;
    private final CacheWithExpiry<Content> cache;


    public BackEndRequest(String key, RestClient client, CacheWithExpiry<Content> cache) {
        super(HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("BackEnd"))
                .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(10)
                        .withMaxQueueSize(1000))

                .andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(1000)
                        .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
                        .withExecutionIsolationThreadInterruptOnTimeout(true)));


        this.key = key;
        this.client = client;
        this.cache = cache;
    }

    @Override
    protected Content run() throws Exception {
        Future<Content> content = cache.apply(key,
                () -> client.get(key),
                com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService(),
                org.greencheek.caching.herdcache.Cache.CAN_ALWAYS_CACHE_VALUE, cachedValueAllowed);


        Content c = content.get();
        if(c==null) {
            throw new RuntimeException("failed to obtain key: " + key);
        } else {
            return c;
        }

    }

    @Override
    protected Content getFallback() {

        Content content = null;
        try {
            content = cache.get(key).get();
            if(content == null) {
                return FALLBACK;
            } else {
                return content;
            }
        } catch (Exception e) {
            return FALLBACK;
        }

    }
}

User Supplied Expiry

Do Not Cross The Streams

Egon Spengler: There's something very important I forgot to tell you.
Peter Venkman: What?
Spengler: Don't cross the streams.
Venkman: Why?
Spengler: It would be bad.
Venkman: I'm fuzzy on the whole good/bad thing. What do you mean, "bad"?
Spengler: Try to imagine all life as you know it stopping instantaneously and every molecule in your body exploding at the speed of light.
Ray Stantz: Total protonic reversal!
Venkman: Right. That's bad. Okay. All right. Important safety tip. Thanks, Egon.

Is your Supplier<V> is a HystrixCommand, and you have coded it’s fallback method to take a reference to the Cache object, and perform a cache.get() of the same key for which the cache apply is running and executing the HystrixCommand, then you are guaranteed failure.

For example, do not do this in a HystrixCommand for the same key that the cache apply is running for.:

    @Override
    protected CacheableItemWithCreationDate<V> getFallback() {
        CacheableItemWithCreationDate<V> contentObj = null;

        try {
            contentObj = cache.get(cacheKey).get();
        } catch (Throwable e) {
            e.printStackTrace();
        }
        return contentObj;
    }

The reason for this is that if your HystrixComamnd’s execute is running as the implmentation of the Supplier<V> interface.The your Hystrix Command’s run() method via the hystrix execute() method. The HystrixCommand (Supplier<V>) is executing in a (asynchronous) Future<V> within herdcache. This Future<V> is stored in an internal map in herdcache as a thundering herd mechanism (https://github.com/tootedom/herdcache#overview), under the key you are looking up.

The Future<V> exists in the internal map, keyed on the given key, until the Supplier<V> command effectively returns a value. For a HystrixCommand’s execute() method, the resulting value is either from the commands run() method or its getFallback().

The problem here is that if you call in your command’s getFallback() the cache.get(key) method you are still "effectively" in the execute() method, and the Future<V> is yet complete (it is still waiting to generate a value from run() or getFallback()). The cache.get(cacheKey).get() will be waiting on exactly the same Future object that was created by herdcache when it initially executed the Supplier<V> (The HystrixCommand’s execute() method). The cache.get(cacheKey).get(); checks the internal map for an already executing Future that mapped to the given key:

In other words:

  • The Supplier<V> (HystrixCommand’s run()) is executing in a Future. Say Future X. This Future X is stored in a Map<String,Future> internally in herdcache, keyed on PID_XYZ

  • If run() fails, this will result in the HystrixCommand’s getFallback() being called.

  • If getFallback() this invokes cache.get("PID_XYZ"). Then the future previously stored in the Map, keyed on "PID_XYZ", is returned from the Map.

  • This is the same Future that is executing run().

  • As a result what you effectively have is a loop.


Return Invalid Object Whilst Revalidate

This is technically an alternate implementation of the stale-while-revalidate pattern. This feature is available in herdcache version 1.0.26, and is a implemented with in the interface RevalidateInBackgroundCapableCache. This interface extends the CacheWithExpiry interface with an additional parameter returnInvalidCachedItemWhileRevalidate on the apply(…​) method:

  public ListenableFuture<V> apply(String key,
                                     Supplier<V> computation,
                                     Duration timeToLive,
                                     ListeningExecutorService executorService,
                                     Predicate<V> canCacheValueEvalutor,Predicate<V> isCachedValueValid,
                                     boolean returnInvalidCachedItemWhileRevalidate);


  public ListenableFuture<V> apply(String key,
                                     Supplier<V> computation,
                                     ListeningExecutorService executorService,
                                     Predicate<V> canCacheValueEvalutor,Predicate<V> isCachedValueValid,
                                     boolean returnInvalidCachedItemWhileRevalidate);

The flow of the apply method, when returnInvalidCachedItemWhileRevalidate is true, is as follows:

  • An item is found in the cache

  • The item is passed to the Predicate<V> isCachedValueValid

  • If the isCachedValueValid predicate returns true then the cache value if returned.

  • However, if the isCachedValueValid predicate returns false then the "invalid" value is returned (set on the Future<V>), while the Supplier<V> computation is submitted to the executorService in order to refresh the item in the cache.


Example Simple ElastiCache Test Class

The below is a simple java main class the can be run on the command line like the following. The below generates a random integer between 1 and 1000, and applys that value to the cache.

java -DmaxRand=1000 -Dmillis=500 -Dhosts=herdtesting.jgkygp.cfg.euw1.cache.amazonaws.com:11211 -jar herd-elastitest-0.1.0-SNAPSHOT-relocated-shade.jar

With the given logback.xml, you would have output on the console that will show if I can hit or not occurred:

71671 [pool-1-thread-1] DEBUG MemcachedCacheHitsLogger - { "cachehit" : "-778756949", "cachetype" : "distributed_cache"}
71671 [pool-1-thread-1] INFO  ElastiCacheTest - Adding cache value : 633
71680 [pool-1-thread-1] DEBUG MemcachedCacheHitsLogger - { "cachehit" : "274176478", "cachetype" : "distributed_cache"}
71680 [pool-1-thread-1] INFO  ElastiCacheTest - Adding cache value : 35
71690 [pool-1-thread-1] DEBUG MemcachedCacheHitsLogger - { "cachemiss" : "65783974", "cachetype" : "distributed_cache"}
71690 [pool-1-thread-1] DEBUG o.g.c.h.m.BaseMemcachedCache - set requested for 65783974
71691 [pool-1-thread-1] INFO  ElastiCacheTest - Adding cache value : 107
package org.greencheek.caching.elasticache;


import com.google.common.util.concurrent.MoreExecutors;
import net.spy.memcached.ConnectionFactoryBuilder;
import org.greencheek.caching.herdcache.CacheWithExpiry;
import org.greencheek.caching.herdcache.memcached.ElastiCacheMemcachedCache;
import org.greencheek.caching.herdcache.memcached.config.builder.ElastiCacheCacheConfigBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;


/**
 *
 */
public class ElastiCacheTest {

  private static final ScheduledExecutorService service  = Executors.newSingleThreadScheduledExecutor();
  private static final Logger logger = LoggerFactory.getLogger("ElastiCacheTest");


  private static final CacheWithExpiry cache = new ElastiCacheMemcachedCache<Integer>(
            new ElastiCacheCacheConfigBuilder()
            .setElastiCacheConfigHosts(System.getProperty("hosts","localhost:11211"))
            .setConfigPollingTime(Duration.ofSeconds(Integer.getInteger("pollingTime",60)))
            .setInitialConfigPollingDelay(Duration.ofSeconds(0))
            .setTimeToLive(Duration.ofSeconds(10))
            .setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
            .setWaitForMemcachedSet(true)
            .setDelayBeforeClientClose(Duration.ofSeconds(1))
            .setDnsConnectionTimeout(Duration.ofSeconds(2))
            .setUseStaleCache(true)
            .setStaleCacheAdditionalTimeToLive(Duration.ofSeconds(4))
            .setRemoveFutureFromInternalCacheBeforeSettingValue(true)
            .buildElastiCacheMemcachedConfig());

  public static void main(String[] args) {
      service.scheduleAtFixedRate(()-> {
              int i = randInt(Integer.getInteger("minRand",1),Integer.getInteger("maxRand",2));
              logger.info("Adding cache value : {}",cache.awaitForFutureOrElse(
                      cache.apply(""+i,() -> { return i; },
                        MoreExecutors.sameThreadExecutor()
                      ),
                      MoreExecutors.sameThreadExecutor()),"null");
      }
      ,0,Integer.getInteger("millis",1000),TimeUnit.MILLISECONDS);

  }



  public static int randInt(int min,int max) {

    // NOTE: Usually this should be a field rather than a method
    // variable so that it is not re-seeded every call.
      Random rand = new Random();

    // nextInt is normally exclusive of the top value,
    // so add 1 to make it inclusive
    return rand.nextInt((max - min) + 1) + min;
  }
}


<configuration scan="true" scanPeriod="120 seconds" >
    <contextListener class="ch.qos.logback.classic.jul.LevelChangePropagator">
        <resetJUL>true</resetJUL>
    </contextListener>
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <!-- encoders are assigned the type
             ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
        <encoder>
            <pattern>%-4relative [%thread] %-5level %logger{35} - %msg %n</pattern>
        </encoder>
    </appender>


    <logger name="net.spy" level="WARN"/>

    <root level="DEBUG">
        <appender-ref ref="STDOUT" />
    </root>
</configuration>

Internal Notes

To compile and run perf tests do:

mvn clean test-compile assembly:single
$JAVA_HOME/bin/java -jar target/performancetests-test-jar-with-dependencies.jar

Example output:

Benchmark                                                       Mode  Cnt   Score    Error   Units
PerfTestApplyCommand.applyDefaultKetamaHashAlgoTest            thrpt   40  45.778 ±  4.248  ops/ms
PerfTestApplyCommand.applyDefaultKetamaHashAlgoTestLargeValue  thrpt   40  38.663 ± 11.279  ops/ms
PerfTestApplyCommand.applyFolsomTest                           thrpt   40  37.213 ±  4.314  ops/ms
PerfTestApplyCommand.applyFolsomTestLargeValue                 thrpt   40  33.782 ±  6.222  ops/ms
PerfTestApplyCommand.applyJenkinsHashAlgoTest                  thrpt   40  49.804 ±  8.375  ops/ms
PerfTestApplyCommand.applyJenkinsHashAlgoTestLargeValue        thrpt   40  43.057 ± 10.184  ops/ms
PerfTestApplyCommand.applyNative64XXHashAlgoTest               thrpt   40  47.586 ±  5.329  ops/ms
PerfTestApplyCommand.applyNative64XXHashAlgoTestLargeValue     thrpt   40  38.698 ±  8.471  ops/ms
PerfTestApplyCommand.applyNoKeyHashingJenkinsTest              thrpt   40  56.266 ± 12.331  ops/ms
PerfTestApplyCommand.applyNoKeyHashingJenkinsTestLargeValue    thrpt   40  60.013 ± 22.869  ops/ms
PerfTestApplyCommand.applySHA256HashingJenkinsTest             thrpt   40  43.280 ±  1.106  ops/ms
PerfTestApplyCommand.applySHA256HashingJenkinsTestLargeValue   thrpt   40  31.405 ±  6.456  ops/ms
PerfTestApplyCommand.applyXXHashAlgoTest                       thrpt   40  45.088 ±  3.099  ops/ms
PerfTestApplyCommand.applyXXHashAlgoTestLargeValue             thrpt   40  34.139 ±  6.772  ops/ms
o.g.c.h.p.b.compression.SnappyPerfTest.iq80Compresss                                      thrpt   40   72.708 ±  2.179  ops/ms
o.g.c.h.p.b.compression.SnappyPerfTest.iq80Decompresss                                    thrpt   40  162.476 ±  3.815  ops/ms
o.g.c.h.p.b.compression.SnappyPerfTest.xerialCompress                                     thrpt   40  105.303 ±  2.542  ops/ms
o.g.c.h.p.b.compression.SnappyPerfTest.xerialDecompress                                   thrpt   40  173.413 ±  6.217  ops/ms

LZ4PerfTest.compress                             thrpt   40  138.716 ± 1.626  ops/ms
LZ4PerfTest.compressAndDecompress                thrpt   40   98.266 ± 1.031  ops/ms
SnappyCompressionPerfTest.compress               thrpt   40  109.232 ± 4.480  ops/ms
SnappyCompressionPerfTest.compressAndDecompress  thrpt   40   72.289 ± 0.820  ops/ms

About

Cache based on futures. Idea's taken from the spray caching library, and created in java 8

Resources

Stars

Watchers

Forks

Packages

No packages published

Languages