- Overview
- New Features
- Breaking Changes
- Usage
- Cache Types
- Observable
- Non Observable
- Defaults
SpyMemcachedCache
ElastiCacheMemcachedCache
(AWS ElastiCache Support)- Setting ElastiCache Hosts
- Specifying the polling time
- Persistent Connection to ElastiCache Configuration Endpoint
- Cluster nodes update and closing old SpyMemcached client
- ElastiCache Configuration Endpoint timeout
- Host lookup
- ElastiCache Configuration Url Endpoint update
- ElastiCache Chosing Not to use a Cached Value Predicate
- User Supplied Expiry
- Return Invalid Object Whilst Revalidate
- Example Simple ElastiCache Test Class
- Internal Notes
The cache borrows heavily from the concepts, and implementation of caching in spray-caching
The idea here being that the cache returns a future rather than a value. The value is that of a given generic type. The benefit of this approach is that it nicely takes care of thundering herds/cache miss storm issue.
This is where many requests to a particular cache key (e.g. a resource URI) arrive at the same time. I.e. a popular news story or similar. The result is you have a lot of requests to the same memcached server, at the same time, to fetch the same value. This puts huge load on that 1 memcached server, which could result in an outage. If the item is not in the cache all the requests end up hitting the upstream services that generate the cache value.
The result being that the cache key is requested ( the processing associated with calculating the value is performed ) multiple times. As a result the backend resource is put under undue load.
Returning a future from the cache means that multiple resources for a missing cache value that hasn’t been calculated, wait on the same future that is executing for the initial/first request for the cache key.
Returning a future also means that one request for a cache value from memcached can satisfy many requests.
2.0.0 sees support for RxJava, and a new implementation of the caching to support RxJava. There is a new interface:
`
public interface ObservableCache<V extends Serializable>
`
Sorry if you wanted RxJava support to be backwards compatible. It was just easier, and more efficient to implement the cache with RxJava support, as a completely new Interface and Implementation.
The RxJava implementation has been tested with 1.1.1 and 1.2.0. The dependency is 1.2.0. The only reason for this is that
Single
is no longer beta in 1.2.0 (so I can only guess there’s many many fixes between 1.1.1 and 1.2.0).
The implementations are SpyObservableMemcachedCache
and ElastiCacheObservableMemcachedCache
. Methods return Single<CacheItem<V>>
See details future on for information regarding RxJava support.
TL;DR If you are using defaults your keys (under which items will be stored), will be different. As a result, lots of cache misses.
In all versions < 2, the following was the default for key hashing. By Key hashing, I mean the value against which an item is ultimately stored in memcached. If you request an item to be stored under "bob", the keyHashing changes this to a fixed width string that is the hash value, i.e. "45867239"
`
KeyHashingType.NATIVE_XXHASH
`
This in hindesight (ah the beauty) is a stupid default. Why? key collision. Which means the two items you ask to be stored in the cache under different keys, could actually be stored under the same key. i.e. collide. The hashing algorithm with least collisions would have been the following (and much quicker on 64bit systems):
`
KeyHashingType.NATIVE_XXHASH_64
`
However, you’d still have the chance of a collision. The best default is NONE
. The default to prefer "correctness" over
performance. The reason of chosing a hashing algorithm was to remove the "cache key checking" that the memcached client does
for each key (MAX_KEY_LENGTH == 250), on any operation. With a fixed length string, the below was not required:
public static void validateKey(final String key, final boolean binary) {
byte[] keyBytes = KeyUtil.getKeyBytes(key);
int keyLength = keyBytes.length;
if (keyLength > MAX_KEY_LENGTH) {
throw KEY_TOO_LONG_EXCEPTION;
}
if (keyLength == 0) {
throw KEY_EMPTY_EXCEPTION;
}
if(!binary) {
for (byte b : keyBytes) {
if (b == ' ' || b == '\n' || b == '\r' || b == 0) {
throw new IllegalArgumentException(
"Key contains invalid characters: ``" + key + "''");
}
}
}
}
However, as described we run the risk of "collisions". Meaning you could get the unexpected. A value stored under a key that you did not expect.
As a result, the default hash changed to:
KeyHashingType.NONE
The result of this is that no hashing of the key is done. The string you pass, is what the key is stored under in memcached, you need not worry about collisions.
If you want to disable the key checking that is done for each operation, as you know your keys conform. You can set the following:
.setKeyValidationType(KeyValidationType.NONE)
This disables the previously mentioned key validation. If you do have a key that doesn’t conform, you will see messages like the following in your logs:
2016-09-24 12:36:11.176 ERROR net.spy.memcached.protocol.ascii.StoreOperationImpl: Error: CLIENT_ERROR bad command line format
The below details how to use the caching implementation, and the various ways to talk to the cache, which fall into 3 categories:
-
GET (check if a value is in the cache)
-
SET (set a value regardless of it is exists or not)
-
APPLY (only set a value, if it doesn’t exist currently)
Each of the write methods, apply and set, allow the value to be calculated from a Supplier<V> function.
<dependency>
<groupId>org.greencheek.caching</groupId>
<artifactId>herdcache</artifactId>
<version>2.0.1</version>
</dependency>
Please note that 0.1.0 is not backwards compatible with 1.0.1. 1.0.1 extends the Cache interface to include a couple of get methods. Therefore, introduction a breaking change with the old api.
There are currently two main sections of interface types: Observable and NonObservable. Observable is the support for RxJava.
The observerable interface is ObservableCache<V extends Serializable>
and the implementations:
-
SpyObservableMemcachedCache<V extends Serializable>
-
ElastiCacheObservableMemcachedCache<V extends Serializable>
The non-observable interface is as follows. There are currently two types of Cache interface. Cache<V>
interface and
the CacheWithExpiry<V>
that extends upon the Cache<V>
.
The two implementation’s of the CacheWithExpiry<V>
.
-
SpyMemcachedCache<V>
-
ElastiCacheMemcachedCache<V>
The are a couple of implementations of the Cache<V>
interface. However, these are deprecated and will not be discussed
further:
-
SimpleLastRecentlyUsedCache<V>
-
ExpiringLastRecentlyUsedCache<V>
This is new as of 2.0.1, and is an implementation of herd cache using RxJava. The Single<CacheItem<V>> has been tested with version 1.1.1 and 1.2.0, and appears to be working as expected (on my machine. dot tm')
The new observable cache interface ObservableCache<V extends Serializable>
, is focused at RxJava support. The two
implementations:
-
SpyObservableMemcachedCache<V extends Serializable>
-
ElastiCacheObservableMemcachedCache<V extends Serializable>
implement this interface, which has the methods:
-
default public Single<CacheItem<V>> apply(String key, Supplier<V> computation, Duration timeToLive) // Supplier Values always valid, and Cache values always valid
-
default public Single<CacheItem<V>> apply(String key, Supplier<V> computation, Duration timeToLive,Predicate<V> isSupplierValueCachable) // Cache values always valid
-
default Single<CacheItem<V>> apply(String key, Supplier<V> computation,Predicate<V> isSupplierValueCachable,Predicate<V> isCachedValueValid) // No TTL for cached item
-
default public Single<CacheItem<V>> set(String keyString, Supplier<V> value, Duration timeToLive) // Supplier Value is always valid
-
default public Single<CacheItem<V>> set(String keyString, V value, Duration timeToLive) // Supplier Value is always valid
-
public Single<CacheItem<V>> get(String key);
-
public Single<CacheItem<V>> apply(String key, Supplier<V> computation, Duration timeToLive,Predicate<V> isSupplierValueCachable,Predicate<V> isCachedValueValid)
-
public Single<CacheItem<V>> set(String keyString, Supplier<V> computation, Duration timeToLive,Predicate<V> canCacheValueEvaluator);
-
public Single<Boolean> clear(String key);
-
public void shutdown();
All implementations return the RxJava Observable implementation Single<CacheItem<V>>
. Which means a single value is
returned, or a Throwable.
The RxJava Support does not implement a "Stale With Revalidate" notion. The reason being is that this is entirely implementable/customizable on the client side.
All of the Single
observables that are returned by the implementation are Cold Observables.
The CacheItem<V>
return type is just a wrapper around your V
implementation type. V
is the object type that is stored/serialised
to memcached.
The CacheItem<V>
has a few method to make interacting with teh domain object a little less painful.
For example lets take the cache get: cache.get(string)
that will eventually return CacheItem<String>
, all being well.
CacheItem<String> item = cache.get("string").toBlocking().value()
The get against memcached might either return a value, or return "null". Therefore the value returned is wrapped in a Optional<String>
-
To get the Optional:
Optional<String> v = item.getValue()
Optional<String> v = item.optional()
-
If you would rather just have the value and check for null:
String v = item.value()
-
If you want to return a default value, if 'null' was returned:
String v = item.value("default")
-
If you want to check if nothing was returned:
item.isEmpty()
-
If you want to check if something was returned:
item.hasValue()
-
If you want to obtain the key the item is cached under
item.getKey()
-
If you want to check if the result was from the cache
item.isFromCache()
The apply and set methods, the actual write to memcached can be either synchronous or asynchronous. By default the memcached write is that of asynchronous.
-
public Single<CacheItem<V>> apply(String key, Supplier<V> computation, Duration timeToLive,Predicate<V> isSupplierValueCachable,Predicate<V> isCachedValueValid)
-
public Single<CacheItem<V>> set(String keyString, Supplier<V> computation, Duration timeToLive,Predicate<V> canCacheValueEvaluator);
It is the value of the MemcachedCacheConfigBuilder.setWaitForMemcachedSet(true|false)
that determines if the writing of
the value to memcached runs on the same scheduler as .subscribeOn
or not.
When set to true, the memcached write will be written on the same scheduler that the .subscribeOn
runs
(http://reactivex.io/documentation/operators/subscribeon.html).
When it is set to false, the default is for the memcached write to run on the Schedulers.io
thread pool. You can choose
the pool the write operates on by setting MemcachedCacheConfigBuilder.setWaitForMemcachedSetRxScheduler(Scheduler scheduler)
.
Setting it to schedulers.immediate() is the same as MemcachedCacheConfigBuilder.setWaitForMemcachedSet(true)
When setting MemcachedCacheConfigBuilder.setWaitForMemcachedSet(false)
you need to be wary of the default timeout for the
memcached write to complete in, which is 2 seconds. If you which to increase this set
MemcachedCacheConfigBuilder.setWaitDuration(Duration d)
to something larger. For example:
.setWaitDuration(Duration.ofSeconds(5))
The ObservableCache
is returning the domain object of CacheItem
. This has a isFromCache
method that tells you if
the apply(…)
command returned the value from cache or not. You can use this to start you "stale" cache implementation.
If you want to implement your stale caching implementation based on the "actual" cache value you cached, then is entirely
possible. See the test org.greencheek.caching.herdcache.memcached.observable.TestExampleStaleWhileRevalidateObservableCache
as an example based on the actual value.
Single<CacheItem<String>> val = cache.set("Key1", () -> "SomeValue",Duration.ofSeconds(60));
// The above just creates the Observable. Nothing has happened. If you do not subscribe
// The item will never be set in the cache.
// Register a subscriber to have the set execute
String itemSet = val.toBlocking().value().value());
If "Key1" already exists in the cache then it is returned. Otherwise the new value is returned, and set in the cache
Single<CacheItem<String>> val = cache.apply("Key1", () -> "NewValue",Duration.ofSeconds(60));
// The above just creates the Observable. Nothing has happened. If you do not subscribe
// The item will never be set in the cache.
// Register a subscriber to have the set execute
String itemSet = val.toBlocking().value().value());
Single<CacheItem<String>> val = cache.get("Key1")
// The above just creates the Observable. Nothing has happened. If you do not subscribe
// The item will never be set in the cache.
// Register a subscriber to have the get execute, and return a value
String item = val.toBlocking().value().value());
In all of the above examples, we assume no Runtime Exceptions occur, when the value is obtained from the Supplier<V>
(from set(…)
and apply(…)
). If the Supplier throws an exception they will be propagated to the Subscriber.
(Exceptions from readding or writing to memcached will NOT be propagate to Subscribers).
Therefore, if you expect that an exception can be thrown from your supplier, you should pass in the appropriate onError action to the subscribe. Example:
class MyFunkyException extends RuntimeException {
}
@Test
public void testRuntimeException() {
cache = new SpyObservableMemcachedCache<>(
new ElastiCacheCacheConfigBuilder()
.setMemcachedHosts("localhost:" + memcached.getPort())
.setTimeToLive(Duration.ofSeconds(10))
.setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
.setWaitForMemcachedSet(true)
.setWaitForRemove(Duration.ofMillis(0))
.setKeyPrefix(Optional.of("elastic"))
.buildMemcachedConfig()
);
Single<CacheItem<String>> val = cache.set("Key1", () -> {throw new MyFunkyException();} , Duration.ofSeconds(60));
boolean errorThrown = false;
try {
String item = val.toBlocking().value().value();
} catch (MyFunkyException e) {
errorThrown = true;
}
assertTrue("should have thrown custom exception",errorThrown);
final AtomicBoolean success = new AtomicBoolean(false);
final AtomicBoolean failure = new AtomicBoolean(false);
assertEquals(0, memcached.getDaemon().getCache().getCurrentItems());
val.subscribe(
new Action1<CacheItem<String>>() {
@Override
public void call(CacheItem<String> stringCacheItem) {
success.set(true);
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
if(throwable instanceof MyFunkyException) {
failure.set(true);
}
}
});
assertFalse("should have thrown an exception",success.get());
assertTrue("should have thrown custom exception", failure.get());
}
If there ever was a confusing subject for RxJava, this is it. subscribeOn
is where the Observable’s code will execute.
observeOn
is where the Subscribers code will execute.
Lets say we are running on the main
Thread and we execute this:
CountDownLatch latch = new CountDownLatch(1);
String value = "value1";
Single<CacheItem<String>> val = cache.apply("Key1", () -> {
System.out.println("Supplier Value: " + Thread.currentThread().getName());
return value;
}, Duration.ofSeconds(60));
val.subscribe(calculatedValue -> {
System.out.println("subscription: " + Thread.currentThread().getName());
latch.countDown();
});
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
The output would be:
Supplier Value: main
subscription: main
The above means that the execution of the following all occur on the main
thread:
-
memcached lookup (get)
-
executing the
Supplier<V>
-
memcached write (set)
-
Calling the Subscribers code
If we now set subscribeOn(Schedulers.io())
on the Single<CacheItem<String>>
what this does is, excute the following
on the IO scheduler RxIoScheduler
:
-
memcached lookup (get)
-
executing the
Supplier<V>
-
memcached write (set)
-
Calling the Subscribers code
CountDownLatch latch = new CountDownLatch(1);
String value = "value1";
Single<CacheItem<String>> val = cache.apply("Key1", () -> {
System.out.println("Supplier Value: " + Thread.currentThread().getName());
return value;
}, Duration.ofSeconds(60));
val = val.subscribeOn(Schedulers.io());
val.subscribe(calculatedValue -> {
System.out.println("subscription: " + Thread.currentThread().getName());
latch.countDown();
});
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
You will be able to see the following in the output.
Supplier Value: RxIoScheduler-2
subscription: RxIoScheduler-2
If we now set observeOn(Schedulers.computation())
on the Single<CacheItem<String>>
what this does is, excute the following
on the IO scheduler RxIoScheduler
:
-
memcached lookup (get)
-
executing the
Supplier<V>
-
memcached write (set)
And execute the following on the computation scheduler, RxComputationScheduler
:
-
Calling the Subscribers code
CountDownLatch latch = new CountDownLatch(1);
String value = "value1";
Single<CacheItem<String>> val = cache.apply("Key1", () -> {
System.out.println("Supplier Value: " + Thread.currentThread().getName());
return value;
}, Duration.ofSeconds(60));
val = val.subscribeOn(Schedulers.io());
val = val.observeOn(Schedulers.computation());
val.subscribe(calculatedValue -> {
System.out.println("subscription: " + Thread.currentThread().getName());
latch.countDown();
});
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
You will be able to see the following in the output.
Supplier Value: RxIoScheduler-2
subscription: RxComputationScheduler-1
It is the value of the MemcachedCacheConfigBuilder.setWaitForMemcachedSet(true|false) that determines if the writing of the value to memcached runs on the same scheduler as .subscribeOn or not.
When set to true, the memcached write will be written on the same scheduler that the .subscribeOn
runs
(http://reactivex.io/documentation/operators/subscribeon.html).
When it is set to false, the default is for the memcached write to run on the Schedulers.io()
thread pool.
You can choose the pool the write operates on by setting MemcachedCacheConfigBuilder.setWaitForMemcachedSetRxScheduler(Scheduler scheduler)
.
Setting it to schedulers.immediate() is the same as MemcachedCacheConfigBuilder.setWaitForMemcachedSet(true)
When setting MemcachedCacheConfigBuilder.setWaitForMemcachedSet(false)
you need to be wary of the default timeout for
the memcached write to complete in, which is 2 seconds. If you which to increase this set MemcachedCacheConfigBuilder.setWaitDuration(Duration d)
to something larger. For example:
.setWaitDuration(Duration.ofSeconds(5))
This is using traditional futures, and the original implementation of herdcache.
The cache interface that the beginning of its life, used to have a single method apply
that took:
-
The key to look for
-
An implementation of the
Supplier<T>
functional interface -
The guava
ListeningExecutorService
executor
That method was: ListenableFuture<V> apply(String key, Supplier<V> computation, ListeningExecutorService executorService)
The returned value is that of Guava’s ListenableFuture, upon which you can attach a callback, or wait for a value to be generated:
The cache has now been extended to have more methods, as well as the introduction of a second interface CacheWithExpiry<V>
.
You will probably most likely work with the CacheWithExpiry
interface.
As mentioned there are 3 types of methods on the interfaces: get, set, apply
Both method types, get and apply, lookup a value in the cache that is associated with a key. The difference between the get
and the apply
,
is that the apply
can generate the value, whilst the get
only looks up in the cache. The set on the other hand only ever
sets a value in the cache.
Both get methods lookup a cache value, always returning a Guava’s ListenableFuture
The below shows a couple of examples of working with the returned ListenableFuture
.
-
Adding a callback:
// Executes on the calling thread Futures.addCallback(future,new FutureCallback<String>() { @Override public void onSuccess(String result) { } @Override public void onFailure(Throwable t) { } }); // Executes on the passing in executor thread pool private final ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10)); Futures.addCallback(val,new FutureCallback<String>() { @Override public void onSuccess(String result) { } @Override public void onFailure(Throwable t) { } },executorService);
-
Waiting for the value (or failure)
try { future.get(); } catch (InterruptedException e) { } catch (ExecutionException e) { // Any exception that occurred in the Supplier will be the .getCause() }
More likely than not, you will be interacting with this interface. This interface extends upon the Cache<V>
interface
to allow you to provide method level durations for items stored in the cache.
The list of available methods are:
-
public ListenableFuture<V> apply(String key, Supplier<V> computation)
-
public ListenableFuture<V> apply(String key, Supplier<V> computation, ListeningExecutorService executorService)
-
public ListenableFuture<V> apply(String key, Supplier<V> computation, ListeningExecutorService executorService,Predicate<V> canCacheValueEvalutor)
-
public ListenableFuture<V> apply(String key, Supplier<V> computation, ListeningExecutorService executorService,Predicate<V> canCacheValueEvalutor,Predicate<V> isCachedValueUsable);
-
public ListenableFuture<V> apply(String key, Supplier<V> computation, Duration timeToLive, ListeningExecutorService executorService)
-
public ListenableFuture<V> apply(String key, Supplier<V> computation, Duration timeToLive, ListeningExecutorService executorService, Predicate<V> isSupplierValueCachable)
-
public ListenableFuture<V> apply(String key, Supplier<V> computation, ListeningExecutorService executorService, Predicate<V> isSupplierValueCachable,Predicate<V> isCachedValueValid)
-
public ListenableFuture<V> apply(String key, Supplier<V> computation, Duration timeToLive, ListeningExecutorService executorService, Predicate<V> isSupplierValueCachable,Predicate<V> isCachedValueValid)
-
public ListenableFuture<V> get(String key)
-
public ListenableFuture<V> get(String key,ListeningExecutorService executorService)
-
public ListenableFuture<V> set(String keyString, V value)
-
public ListenableFuture<V> set(String keyString, Supplier<V> value)
-
public ListenableFuture<V> set(String keyString, V value, ListeningExecutorService executorService)
-
public ListenableFuture<V> set(String keyString, Supplier<V> value, ListeningExecutorService executorService)
-
public ListenableFuture<V> set(String keyString, Supplier<V> value, Predicate<V> canCacheValueEvalutor, ListeningExecutorService executorService)
-
public ListenableFuture<V> set(String keyString, Supplier<V> computation, Duration timeToLive,Predicate<V> canCacheValueEvaluator,ListeningExecutorService executorService)
-
public ListenableFuture<V> set(String keyString, Supplier<V> value, Duration timeToLive)
-
public ListenableFuture<V> set(String keyString, V value, Duration timeToLive)
-
public ListenableFuture<V> set(String keyString, V value, Duration timeToLive, ListeningExecutorService executorService)
-
public ListenableFuture<V> set(String keyString, Supplier<V> value, Duration timeToLive, ListeningExecutorService executorService)
The apply(…)
method returns a Future that wraps both the lookup for the cache value in memcached and if no value
exists in memcached, the generation of the value from the Supplier<V>
The Cache<V>
interface inherits a Utility interface (AwaitOnFuture<V>
) that gives you a couple of utility methods that allow you to wait
on futures, for a value to be calculated
-
V awaitForFutureOrElse(ListenableFuture<V> future, V onExceptionValue)
-
V awaitForFutureOrElse(ListenableFuture<V> future, V onExceptionValue, V onTimeoutValue, long duration, TimeUnit timeUnit)
A the value returned back from a cache apply is that of a ListenableFuture
. You can naturally wait on the currently
executing thread (blocking that thread), for a value to be returned. This is as follows:
try {
return future.get();
} catch (Exception e) {
return somefallback;
}
The method V awaitForFutureOrElse(ListenableFuture<V> future, V onExceptionValue)
, remove the ceremony of the try/catch
block for you.
The other method V awaitForFutureOrElse(ListenableFuture<V> future, V onExceptionValue, V onTimeoutValue, long duration, TimeUnit timeUnit)
allows you wait a finite amount of time for a value to be returned. The amount of time elapsed, the onTimeoutValue
is going to be returned.
Any other exception results in the onExceptionValue
being thrown.
There are two implementations of the CacheWithExpiry<V>
interface:
-
SpyMemcachedCache<V>
-
ElastiCacheMemcachedCache<V>
The second implementation ElastiCacheMemcachedCache<V>
is an extension of the SpyMemcachedCache<V>
implementation
for working with Amazon AWS’s memcached support (known as ElastiCache).
The CacheWithExpiry<V>
interface differs from that of the Cache<V>
, by having Duration element as part of the cache method.
This allows you to specify the duration (length of time) that the item lives in the cache.
Both the following cache classes use the following defaults.
The ElastiCacheCacheConfigBuilder
extends the abstract class MemcachedCacheConfigBuilder
which contains the defaults
for which the SpyMemcachedCache<V>
will execute. The builder allows you to override the defaults:
The following defaults are for both memcached and ElastiCache memcached
Method | Default | Description | |
---|---|---|---|
setTimeToLive |
Duration.ofSeconds(60); |
The default expiry time an item with be given if not specified |
|
setMaxCapacity |
1000; |
Max number of futures to internal cache whilst a value is being calculated. This is NOT the max number of items cachable in memcached |
|
setMemcachedHosts |
"localhost:11211"; |
Comma separated host list |
|
setHashingType |
ConnectionFactoryBuilder.Locator.CONSISTENT; |
Using consistent hashing, don’t change |
|
setFailureMode |
FailureMode.Redistribute; |
When an error occurs, what should occur (FailureMode.Retry may suit you better for this) |
|
setHashAlgorithm |
DefaultHashAlgorithm.KETAMA_HASH; |
Type of consistent hashing to be used for calculating the memcached node to talk to, don’t change |
|
serializingTranscoder |
new FastSerializingTranscoder(); |
The type of serializer to be used. Class responsbile for serialising java objects to a byte stream to store in memcached |
|
protocol |
ConnectionFactoryBuilder.Protocol.BINARY; |
the protocol used for talking to memcached |
|
readBufferSize |
DefaultConnectionFactory.DEFAULT_READ_BUFFER_SIZE; |
default socket buffer size when talking to memcached, do not change |
|
memcachedGetTimeout |
Duration.ofMillis(2500); |
when looking in memcached for a matching key, this is the amount of time to wait before timing out |
|
dnsConnectionTimeout |
Duration.ofSeconds(3); |
When resolving the memcachedHosts to ip addresses, the amount of time to wait for dns lookup, before ignoring that node |
|
waitForMemcachedSet |
false |
Wait for the write to memcached to occur before removing future from internal cache |
|
setWaitDuration |
Duration.ofSeconds(2); |
amount of time to wait for the memcached set |
|
keyHashType |
KeyHashingType.NATIVE_XXHASH; |
how the cache key is hashed. The key is not stored verbatim in memcache and is hash to a number first. This is the hashing algorithm used. |
|
keyPrefix |
Optional.empty() |
should the key used in lookup, be prefixed with a string to avoid the unlikely event of a key claash. |
|
asciiOnlyKeys |
false; |
we only have ascii keys that will be stored in the cache |
|
hostStringParser |
new CommaSeparatedHostAndPortStringParser(); |
do not change |
|
hostResolver |
new AddressByNameHostResolver(); |
do not change |
|
useStaleCache |
false; |
Whether stale caching is enabled |
|
staleCacheAdditionalTimeToLive |
Duration.ZERO; |
The amount of time extra that items will be stored in the stale cached |
|
staleCachePrefix |
"stale"; |
The prefix for stale keys, to avoid clash |
|
staleMaxCapacity |
-1; |
The size of the cache for futures for the stale cache is the same as the |
|
staleCacheMemachedGetTimeout |
Duration.ZERO |
Time to wait for lookups against the stale cache |
|
removeFutureFromInternalCacheBeforeSettingValue |
false; |
When the |
|
metricRecorder |
no metric recorder |
Can take a new |
|
compressionAlgorithm |
SNAPPY |
The type of compression algorithm to use when values are stored in memcached. LZ4 is the quickest implementation |
|
herdProtectionEnabled |
true |
If you which to turn off herd cache protection |
|
setKeyValidationType |
BY_HASHING_TYPE |
If you which to turn off validation of your keys, as you know they conform (KeyValidationType.NONE) |
The following default apply just to that of ElastiCache memcached
Method |
Default |
Description |
|
setElastiCacheConfigHosts |
"localhost:11211"; |
The memcached elasticache config host name i.e. yourcluster.jgkygp.0001.euw1.cache.amazonaws.com:11211 |
|
setConfigPollingTime |
Duration.ofSeconds(60); |
The frequency by which to contact the config host for potential updates to the memcached nodes |
|
setInitialConfigPollingDelay |
Duration.ZERO; |
The time for the initial poll to the config host to obtain the memcached nodes |
|
setConnectionTimeoutInMillis |
Duration.ofMillis(3000); |
The time for establishing a connection to the config host before stopping and retrying |
|
setIdleReadTimeout |
Duration.ofSeconds(125); |
If the client does also receive any data from the ElastiCache Configuration Endpoint, a reconnection will be made; this idle period is controlled by the setting idleReadTimeout. |
|
setReconnectDelay |
Duration.ofSeconds(5); |
The delay between performing a reconnection attempt to the config host |
|
setDelayBeforeClientClose |
Duration.ofSeconds(300); |
When the ElastiCache Configuration Endpoint, outputs a configuration update a new spy memcached client is created, and the old client is closed. There a delay before the old client is closed, as it may still be in use |
|
setNumberOfConsecutiveInvalidConfigurationsBeforeReconnect |
3 |
If the config host returns invalid config this number of times in a row, a reconnection will be made |
|
setUpdateConfigVersionOnDnsTimeout |
true; |
Set to false, if you don’t want to acknowledge a config update, if a dns resolution for any of the memcached nodes failed |
|
setMemcachedClientType |
SPY |
Default memcached client is that of SPY. Folsom is available, but not fully tested, therefore do not change |
|
setUseFolsomStringClient |
false |
If we are just storing string. Folsom specific (do not use) |
|
setFolsomCharset |
UTF-8 |
do not use |
|
setFolsomConnections |
10 |
do not use |
|
setFolsomRequestTimeout 3000 |
do not use |
setFolsomMaxOutstandingRequests |
The SpyMemcachedCache<V>
or ElastiCacheMemcachedCache<V>
thundering herd protection is made available by the use of
an internal cache. The get
and apply
methods make use of this internal cache. It is this internal cache that has a finite size
that is specified by setMaxCapacity
. When the get or apply methods are called, this cache is checked for an existing
ListenableFuture<V>
. If one exists this means a previous execution of get
or apply
is running that is either fetching
the value from memcached or is pending on the the Supplier<V>
to generate the value.
If an existing ListenableFuture<V>
is available in the internal cache this is returned to the user. If one does not
exist a new ListenableFuture<V>
will be create and returned.
It might be the case that you do not need the herd protection (you have a long tail where no one key is hot).
And instead you want to use the Future, Supplier and Predicate functionality of herdcache, withou the 'herd'.
This can be done, by setting herdProtectionEnabled
to false. This replaces the internal cache with a NoOp cache.
The SpyMemcachedCache<V>
implementation uses the spy memcached java library to communicate with memcached.
The implementation is similar to that of SimpleLastRecentlyUsedCache
in that it uses a ConcurrentLinkedHashMap
to store the cache key against an executing future.
When two requests come for the same key, the future is stored in an internal ConcurrentLinkedHashMap:
store.putIfAbsent(keyString, future)
If a subsequent request comes in for the same key, and the future has not completed yet, the existing future in the
ConcurrentLinkedHashMap is returned to the caller. This way two requests wait on the same executing Supplier<V> computation
When constructing the SpyMemcachedCache
, you can specify the max size of the internal ConcurrentLinkedHash that is used
to store the concurrently executing futures.
Unlike the SimpleLastRecentlyUsedCache
implementation, that stores the Completed futures in the ConcurrentLinkedHash
for subsequent cache hits to obtain the completed future’s value, the SpyMemcachedCache<V>
cache removes the key and associated future from
the internal ConcurrentLinkedHash
. The value of the completed future is instead stored in memcached for subsequent retrieval.
Before the Supplier<V> computation
is submitted to the passed executor for execution, the memcached cluster is checked
for the existance of a value for the given key. If a value is present in memcached, the returned future will be set with
the obtained value. This means that if two request comes in for the same key, for which a value is present in memcached
they will wait on the same future to have it’s value set to that of the memcached cache hit.
If a value does not exist in the memcached, then the given Supplier<V>
computation is submitted to the provided executor,
for execution. Once the value has been calculated, it is sent over the network to memcached for storage.
With this library the value is stored asynchronously in memcached, and the future completed with the computed value and sub-sequentially the future is removed from the ConcurrentLinkedHashMap. Therefore, there is a slim time period, between the completion of the future and the value being saved in memcached. This means a subsequent request for the same key could be a cache miss.
It is possible when constructing the SpyMemcachedCache
to specify to a period of time
(i.e. make the asynchronous set into memcached call semi synchronous) to wait for the set to occur.
The SpyMemcachedCache
is created by passing a MemcachedCacheConfig
. A MemcachedCacheConfig
is created via that of
a ElastiCacheCacheConfigBuilder
that contains the method public MemcachedCacheConfig buildMemcachedConfig()
that build
the CacheConfig for both the ElastiCacheMemcachedCache
and the SpyMemcachedCache
The following show various ways of configuring the cache:
cache = new SpyMemcachedCache<>(
new ElastiCacheCacheConfigBuilder()
.setMemcachedHosts("localhost:11211")
.setTimeToLive(Duration.ofSeconds(60))
.setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
.buildMemcachedConfig()
);
ListenableFuture<String> val = cache.apply("Key1", () -> {
return "value1";
}, Duration.ofSeconds(3), executorService);
assertEquals("Value should be key1","value1", cache.awaitForFutureOrElse(val null));
By default the host string is localhost:11211
, however, you can specify a number of hosts to connect to by specifying
them as a comma separated string in the Builder:
CacheWithExpiry<String> cache = new SpyMemcachedCache<>(
new ElastiCacheCacheConfigBuilder()
.setMemcachedHosts("localhost:11211,localhost:11212,localhost:11213"))
.buildMemcachedConfig()
);
When the SpyMemcachedCache
is passed the list of memcached hosts, the ip address for host needs to be resolved.
By default 3 seconds, per host, is waited for to obtain the ip address. This can be controlled, like as follows:
CacheWithExpiry<String> cache = new SpyMemcachedCache<>(
new ElastiCacheCacheConfigBuilder()
.setMemcachedHosts("localhost:11211,localhost:11212,localhost:11213"))
.setDnsConnectionTimeout(Duration.ofSeconds(2))
.buildMemcachedConfig()
);
There are two ways to specify the Expiry of items that are stored in memcached:
-
A global Time To Live for the items
-
Passing the Time To Live for cached item in the
apply
method
The below for example will set a default of 30 seconds for all items saved in the cache, for which a TimeToLive has not been specified:
ListenableFuture<String> val = cache.apply("Key1", () → {return "value1";}, executorService);
CacheWithExpiry<String> cache = new SpyMemcachedCache<>(
new ElastiCacheCacheConfigBuilder()
.setMemcachedHosts("localhost:11211"))
.setTimeToLive(Duration.ofSeconds(30))
.buildMemcachedConfig()
);
To specify the TTL on a per time basis, specify the Duration when calling the apply
method:
ListenableFuture<String> val = cache.apply("Key1", () → {return "value1";}, Duration.ofSeconds(10), executorService);
When an item is not in the cache, or currently being calculated; the cache will execute the Supplier<V>
computation,
and store the returned value in memcached. A future has been created and stored in the internal future calculation cache,
so that any requests for the same key, wait on the completion of the same future.
With this library the computed cache value is stored asynchronously in memcached, and the future completed with the same value. The future is completed, and removed from the internal future calculation cache ( ConcurrentLinkedHashMap ). Therefore, there is a slim time period, between the completion of the future and the value being saved in memcached. This means a subsequent request for the same key could be a cache miss.
As a result, you can request that the write to memcached be synchronous and a finite period be waited for, for the write to take place. This is done a constructor time, as shown in the following which waits a max of 3 seconds for the set to occur.
cache = new SpyMemcachedCache<>(
new ElastiCacheCacheConfigBuilder()
.setMemcachedHosts("localhost:11211"))
.setTimeToLive(Duration.ofSeconds(60))
.setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
.setWaitForMemcachedSet(true)
.setSetWaitDuration(Duration.ofSeconds(3))
.buildMemcachedConfig()
);
ListenableFuture<String> val = cache.apply("Key1", () -> {
return "value1";
}, Duration.ofSeconds(3), executorService);
assertEquals("Value should be key1","value1", cache.awaitForFutureOrElse(val null));
Items in the cache can have no expiry TTL apply by specifying the duration as ZERO
CacheWithExpiry<String> cache = new SpyMemcachedCache<>(
new ElastiCacheCacheConfigBuilder()
.setMemcachedHosts("localhost:11211"))
.setTimeToLive(Duration.ofSeconds(60))
.setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
.setWaitForMemcachedSet(true)
.setSetWaitDuration(Duration.ofSeconds(3))
.buildMemcachedConfig()
);
ListenableFuture<String> val = cache.apply("Key1", () -> {return "value1";}, Duration.ZERO, executorService);
assertEquals("Value should be key1","value1", cache.awaitForFutureOrElse(val null));
The cache key has to be a string. Memcached has a requirement for makeup of keys, when using the TEXT protocol, such that your key object must conform to the following requirements.
-
Needs to be a string
-
cannot contain ' '(space), '\r'(return), '\n'(linefeed)
If you are using the BINARY protocol these requirements do not apply. However, you may wish to perform hashing of the string representing the key to allow for any character to be used. The cache has the ability for a couple of hash representations of the key:
-
NONE,
-
NATIVE_XXHASH,
-
JAVA_XXHASH,
-
MD5_UPPER,
-
SHA256_UPPER,
-
MD5_LOWER,
-
SHA256_LOWER
To use either of these you need to specify the hashing method to be used at cache construction time. For the best performance, XXHash is recommended:
cache = new SpyMemcachedCache<>(
new ElastiCacheCacheConfigBuilder()
.setMemcachedHosts("localhost:" + memcached.getPort())
.setTimeToLive(Duration.ofSeconds(60))
.setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
.setWaitForMemcachedSet(true)
.setKeyHashType(KeyHashingType.MD5_LOWER)
.buildMemcachedConfig()
);
When hashing a key, there is a potential for two different Strings to actually end up with the same Hashed value. As a result you can add a cache prefix to the cache at construction.
The below specifies a cache prefix of article
. This will be prepended to the hashed cache key, the method setHashKeyPrefix(false)
means that the prefix will be added after the cache key has be hashed. setting setHashKeyPrefix(true)
to true means that
the prefix will be prepended to the cache key, and then the hashing will take place. This is the default, as the prefix
has the potential to break the TEXT protocol key requirements (Hashing the key makes sure this does not occur).
cache = new SpyMemcachedCache<>(
new ElastiCacheCacheConfigBuilder()
.setMemcachedHosts("localhost:" + memcached.getPort())
.setTimeToLive(Duration.ofSeconds(60))
.setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
.setWaitForMemcachedSet(true)
.setKeyHashType(KeyHashingType.MD5_LOWER)
.setKeyPrefix(Optional.of("article"))
.setHashKeyPrefix(false)
.buildMemcachedConfig()
);
Since 1.0.6 the client (Cache<V>
) has the following method:
public ListenableFuture<V> apply(String key, Supplier<V> computation, ListeningExecutorService executorService,
Predicate<V> canCacheValueEvalutor);
And The CacheWithExpiry<V>
contains the method:
public ListenableFuture<V> apply(String key, Supplier<V> computation, Duration timeToLive,
ListeningExecutorService executorService,Predicate<V> canCacheValueEvalutor);
These methods allow you to pass a Predicate<V>
that you can use to evaluate whether the value returned from the
Supplier<V>
(the function generating the value to cache), should actually be stored in memcached, etc. This can be
useful in situtations where the Supplier<V>
is lets say a HystrixCommand object, how value has on this occasion been
generated by it’s fallback. The Predicate<V>
could wrap the command object an evaluate if the value was from the
fallback and choose not to cache:
apply("webservicecallx",() -> command.execute(),
(cachevalue) -> {
return !command.isResponseFromFallback();
}
)
Since 1.0.1 the client supports a stale caching mechanism; this by default is not
enabled as it requires an additional future (via composition) to perform the additional cache lookup.
It is also an addition lookup on the memcached server, and also will use x2 the memory (items are stored twice in the cache).
Enabling the stale caching feature is done via the .setUseStaleCache(true)
method.
The stale caching function is a mini "stale-while-revalidate" mechanism. Without the stale caching enabled, when an item expires in the cache, which is popular; then a lot of requests will be waiting on the cache item to be regenerated from the backend. This means you can have a spike in a larger than you would like requests.
With stale caching enabled, only one request will regenerate the item from the backend cache. The other requests will use a stale cache. The stale cached is ONLY checked if a future exists in the internal cache, meaning that a backend request is in operation to calculate the cache item
With stale caching enabled when an item is stored in memcached, it is stored twice. The 2nd time it is stored under a
different key. This key is made up of the hashed cache key, and the stale cache key prefix set via the constructor method
.setStaleCachePrefix("staleprefix")
. The default value is that of stale
.
The item is stored, by default for setTimeToLive
longer than the original cache item.
To provide a value of your own, say 10 minutes extra, you can specify this at construction time:
cache = new SpyMemcachedCache<>(
new ElastiCacheCacheConfigBuilder()
.setMemcachedHosts("localhost:" + memcached.getPort())
.setTimeToLive(Duration.ofSeconds(1))
.setUseStaleCache(true)
.setStaleCacheAdditionalTimeToLive(Duration.ofMinutes(10))
.setStaleCachePrefix("staleprefix")
.setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
.setWaitForMemcachedSet(true)
.buildMemcachedConfig()
);
Stale Caching is available in both SpyMemcachedCache
and ElastiCacheMemcachedCache
Metric are available in both SpyMemcachedCache
and ElastiCacheMemcachedCache
as of version 1.0.11
. The configuration builder takes an option .setMetricsRecorder(..)'
This takes an implementation of org.greencheek.caching.herdcache.memcached.metrics.MetricsRecorder. The default
implementation being a `NoOpMetricRecorder
. The other implementation is that of the new YammerMetricsRecorder(registry)
which uses the yammer metrics library (https://dropwizard.github.io/metrics).
With the YammerMetricsRecorder the following metrics are placed inside the Metrics library:
Method | Description |
---|---|
value_calculation_cache_hitrate |
The cache hits per second on the internal future cache |
value_calculation_cache_missrate |
The cache misses per second on the internal future cache |
value_calculation_cache_hitcount |
The cache hits in total on the internal future cache |
value_calculation_cache_misscount |
The cache misses in total on the internal future cache |
value_calculation_success_count |
The number of successful runs of the Supplier<T> function |
value_calculation_failure_count |
The number of failed runs of the Supplier<T> function |
value_calculation_time_timer |
The time it has taken to execute the Supplier<T> function |
distributed_cache_hitrate |
The cache hits per second on the distributed cache (i.e. memcached) |
distributed_cache_missrate |
The cache misses per second on the distributed cache (i.e. memcached) |
distributed_cache_timer |
The time it takes to lookup a value from the distributed cache |
distributed_cache_count |
The number of lookups in the distributed cache that have been performed |
distributed_cache_hitcount |
The cache hits in total on the distributed cache |
distributed_cache_misscount |
The cache misses in total on the distributed cache |
distributed_cache_writes_count |
The writes performed on the distributed cache |
stale_distributed_cache_timer |
The time it takes to lookup a stale value from the distributed cache |
stale_distributed_cache_hitrate |
The stale cache hits per second on the distributed cache (i.e. memcached) |
stale_distributed_cache_missrate |
The stale cache misses per second on the distributed cache (i.e. memcached) |
stale_distributed_cache_count |
The stale hits performed on the distributed cache (i.e. memcached) |
stale_distributed_cache_hitcount |
The stale cache hits in total on the distributed cache |
stale_distributed_cache_misscount |
The stale cache misses in total on the distributed cache |
stale_value_calculation_cache_misscount |
The cache misses in total on the internal future cache for a stale value |
stale_value_calculation_cache_hitcount |
The cache hits in total on the internal future cache for a stale value |
stale_value_calculation_cache_missrate |
The cache misses per second on the internal future cache for stale value |
stale_value_calculation_cache_hitrate |
The cache hits per second on the internal future cache for stale value |
Since release 1.0.1 there has been support AWS’s ElasticCache memcached cluster:
This is done by creating an instance of ElastiCacheMemcachedCache<V>
rather than SpyMemcachedCache<V>
. An example
is as follows:
CacheWithExpiry<String> cache = new ElastiCacheMemcachedCache<String>(
new ElastiCacheCacheConfigBuilder()
.setElastiCacheConfigHosts("yourcluster.jgkygp.0001.euw1.cache.amazonaws.com:11211")
.setConfigPollingTime(Duration.ofSeconds(10))
.setInitialConfigPollingDelay(Duration.ofSeconds(0))
.setTimeToLive(Duration.ofSeconds(2))
.setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
.setWaitForMemcachedSet(true)
.setDelayBeforeClientClose(Duration.ofSeconds(1))
.setDnsConnectionTimeout(Duration.ofSeconds(2))
.setUseStaleCache(true)
.setStaleCacheAdditionalTimeToLive(Duration.ofSeconds(4))
.setRemoveFutureFromInternalCacheBeforeSettingValue(true)
.buildElastiCacheMemcachedConfig()
);
The ElastiCache cache works by using the auto discovery mechanism as described here, through that of a ElastiCache Configuration Endpoint, which is described here:
You supply to the ElastiCacheMemcachedCache<V>
cache the url of the ElastiCache Configuration Endpoint.
The ElastiCache cache uses the netty library (http://netty.io/) to periodically send the config get cluster command
to the ElastiCache Configuration Endpoint. The ElastiCache keeps a persistent connection open to the ElastiCache
Configuration Endpoint, sending the command periodically. The ElastiCache Configuration Endpoint returns a
configuration similar to the following, that details the actually memcached instances that should be connected to:
CONFIG cluster 0 147
12
myCluster.pc4ldq.0001.use1.cache.amazonaws.com|10.82.235.120|11211 myCluster.pc4ldq.0002.use1.cache.amazonaws.com|10.80.249.27|11211
END
When the version number (the second line) increases a new spy memcached instance is created, and the old spy
memcached instance is scheduled for being closed. The ElastiCacheMemcachedCache<V>
continuously polls the
ElastiCache Configuration Endpoint, for any changes in the number of memcached hosts, or the hosts that are available.
The ElastiCache Configuration Endpoint is specified via the setElastiCacheConfigHosts(String config)
method on the
ElastiCacheCacheConfigBuilder
object:
CacheWithExpiry<String> cache = new ElastiCacheMemcachedCache<String>(
new ElastiCacheCacheConfigBuilder()
.setElastiCacheConfigHosts("yourcluster.jgkygp.0001.euw1.cache.amazonaws.com:11211")
For the moment you should only specify 1 configuration host. Currently a cache cluster is only in one Availability Zone. A cluster cannot at the moment in AWS span multiple Availability Zones. You can have 3 separate elasticache clusters, one in each availability zone, but the cache will only connect to 1 availability zone at any one time.
By default the ElastiCache cache polls the ElastiCache Configuration Endpoint for an update to the nodes that make up the cluster every 60 seconds. This can be configured via the following methods:
-
.setConfigPollingTime(Duration.ofSeconds(10))
-
.setInitialConfigPollingDelay(Duration.ofSeconds(0))
This can be seen in the following example:
private static final CacheWithExpiry cache = new ElastiCacheMemcachedCache<Integer>(
new ElastiCacheCacheConfigBuilder()
.setElastiCacheConfigHosts(System.getProperty("hosts","localhost:11211"))
.setConfigPollingTime(Duration.ofSeconds(Integer.getInteger("pollingTime",60)))
.setInitialConfigPollingDelay(Duration.ofSeconds(0))
.setTimeToLive(Duration.ofSeconds(10))
.setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
.setWaitForMemcachedSet(true)
.setDelayBeforeClientClose(Duration.ofSeconds(1))
.setDnsConnectionTimeout(Duration.ofSeconds(2))
.setUseStaleCache(true)
.setStaleCacheAdditionalTimeToLive(Duration.ofSeconds(4))
.setRemoveFutureFromInternalCacheBeforeSettingValue(true)
.buildElastiCacheMemcachedConfig());
The ElastiCache uses a persistent connection to the ElastiCache Configuration Endpoint. If the connection is lost,
the client will automatically reconnect. The client will wait for a period (default 5 seconds) before reconnecting.
This can be changed by specifying the delay via the method: .setReconnectDelay(Duration.ofSeconds(10))
.
If the client does also receive any data from the ElastiCache Configuration Endpoint, a reconnection will be made;
this idle period is controlled by the setting idleReadTimeout. This is set to 125 seconds by default.
If you modify this setting you SHOULD NOT
set it lower that the polling duration; as you will just end up in the
persistent connection not being persistent.
.setReconnectDelay(Duration.ofSeconds(5))
.setIdleReadTimeout(Duration.ofSeconds(125))
If the ElastiCache Configuration Endpoint is in some way returning invalid configurations,
then the client will reconnect to the Configuration Endpoint. By default it takes 3 consecutive invalid
configurations before the client will reconnect. This is controlled by the method:
setNumberOfConsecutiveInvalidConfigurationsBeforeReconnect(int number)
When the ElastiCache Configuration Endpoint, outputs a configuration update a new spy memcached client is created,
and the old client is closed. There a delay before the old client is closed, as it may still be in use
(i.e. network requests may still be executing). By default the delay is 10 second; this can be change by specifying the
following config builder method .setDelayBeforeClientClose(Duration.ofSeconds(1))
By default the client will wait for 3 seconds for a connection to the ElastiCache Configuration Endpoint.
This can be changed by the following following config builder method .setConnectionTimeoutInMillis(Duration.ofSeconds(2))
When the ElastiCache Configuration Endpoint returns the configuration information it returns the hostname, and it may send with it the IP address.
CONFIG cluster 0 147
12
myCluster.pc4ldq.0001.use1.cache.amazonaws.com|10.82.235.120|11211 myCluster.pc4ldq.0002.use1.cache.amazonaws.com|10.80.249.27|11211
END
If the IP address is not returned the client will perform a DNS lookup on the hostname.
By default the timeout is 3 seconds. This can be changed with the builder method .setDnsConnectionTimeout(Duration.ofSeconds(2))
If a DNS lookup fails, due to connection timeout (or a temporary network issue), or otherwise. By default that host will be excluded from the list of memcached hosts the cache client will be connected to. As a result, this will not change unless you update the cluster configuration and a new version of the config is supplied by the ElastiCache Configuration Endpoint.
A builder configuration property .setUpdateConfigVersionOnDnsTimeout(true)
allows you to change this behaviour when a
host is not resolved to an IP. The resolution of a host’s dns may be a temporary issue, and on the next polling config
the dns will be resolvable. If you set the builder property to false .setUpdateConfigVersionOnDnsTimeout(false)
Then the memcached client will be updated to point to the hosts mentioned in the config; but if any host resolution fails; the client will not record the current configuration’s version number. Meaning on the next poll for the current cluster configuration, the memcached client will again be recreated, to point to the hosts specified in configuration.
Note if the dns resolution is constantly failing then client memcached client will constantly be re-created each time the configuration polling occurs. No validation of the previously resolved hosts, and the current resolved hosts is performed. The client will just be recreated.
This feature is available in 1.0.9
and above.
As previous discussed above, when you create an ElastiCache cache you provide the url to the configuration endpoint:
CacheWithExpiry<String> cache = new ElastiCacheMemcachedCache<String>(
new ElastiCacheCacheConfigBuilder()
.setElastiCacheConfigHosts("yourcluster.jgkygp.0001.euw1.cache.amazonaws.com:11211")
It is possible that you might wish to create another cluster, with a different machine type, and switch the ElastiCache cache over at runtime to the new cluster. For example, you are moving the cache types to faster cpu type machines.
It is possible to do this by constructing a SimpleVolatileBasedElastiCacheConfigServerUpdater
and passing that to the
builder:
ElastiCacheConfigServerUpdater configUrlUpdater = new SimpleVolatileBasedElastiCacheConfigServerUpdater()
CacheWithExpiry<String> cache = new ElastiCacheMemcachedCache<String>(
new ElastiCacheCacheConfigBuilder()
.setElastiCacheConfigHosts("yourcluster.jgkygp.0001.euw1.cache.amazonaws.com:11211")
.setConfigUrlUpdater(configUrlUpdater)
.buildElastiCacheMemcachedConfig())
You would then code a JMX hook, or admin REST endpoint in your application that called the method: connectionUpdated(String url)
to inform the cache that the configuration url has changed, and that it should connect to the new url to
obtain the new list of cache cluster nodes.
configUrlUpdater.connectionUpdated("yourcluster.irujgk.0001.euw1.cache.amazonaws.com:11211")
The cache will connect to the new config cluster endpoint and obtain the cache cluster nodes. The cache will wait for
setReconnectDelay(Duration.ofSeconds(5))
(5 seconds is the default), before attempting the connection to the new cluster
config endpoint. You can reduce or increase this timeout.
This feature is available in 1.0.15
and above.
This feature allows you to choose whether a cache value should be used or not. An example here would be:
-
You are storing a serialised object with an internal TTL.
-
You store the object in memcached (elasticache), with a 0 TTL (never expire)
-
Herd cache apply(…) method is used within a Hystrix command execution to either obtain an item from cache, or calculate from backend
-
The Cache value is only used in the Hystrix command execute if the item is Fresh enough (A Predicate<V> is provided to check the TTL)
-
The backend service is currently dead, so the Hystrix command fallback is executed
-
The hystrix command fallback returns the stale item from cache by calling herdcache get(..) method
import java.io.Serializable;
public class Content implements Serializable {
private static final long serialVersionUID = 1999L;
private final long creationDateEpoch;
private final String content;
public Content(String content) {
this.creationDateEpoch = System.currentTimeMillis();
this.content = content;
}
public String getContent() {
return content;
}
public long getCreationDateEpoch() {
return creationDateEpoch;
}
}
....
....
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandProperties;
import com.netflix.hystrix.HystrixThreadPoolProperties;
import org.greencheek.caching.herdcache.CacheWithExpiry;
import java.util.concurrent.Future;
import java.util.function.Predicate;
public class BackEndRequest extends HystrixCommand<Content> {
// Static fallback
private static final Content FALLBACK = new Content("{}");
// If the value returned by the computation is the FALLBACK, do not cache
Predicate<Content> backendComputationValueCachable = (Content value) -> value != FALLBACK;
// If returned cached value is older than 500ms, do not use the value. Instead re-calculate it, by calling
// the backend command
Predicate<Content> cachedValueAllowed = (Content value) -> value.getCreationDateEpoch() + System.currentTimeMillis() < 500;
private final String key;
private final HttpRestClient client;
private final CacheWithExpiry<Content> cache;
public BackEndRequest(String key, RestClient client, CacheWithExpiry<Content> cache) {
super(HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("BackEnd"))
.andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(10)
.withMaxQueueSize(1000))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(1000)
.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
.withExecutionIsolationThreadInterruptOnTimeout(true)));
this.key = key;
this.client = client;
this.cache = cache;
}
@Override
protected Content run() throws Exception {
Future<Content> content = cache.apply(key,
() -> client.get(key),
com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService(),
org.greencheek.caching.herdcache.Cache.CAN_ALWAYS_CACHE_VALUE, cachedValueAllowed);
Content c = content.get();
if(c==null) {
throw new RuntimeException("failed to obtain key: " + key);
} else {
return c;
}
}
@Override
protected Content getFallback() {
Content content = null;
try {
content = cache.get(key).get();
if(content == null) {
return FALLBACK;
} else {
return content;
}
} catch (Exception e) {
return FALLBACK;
}
}
}
Egon Spengler: There's something very important I forgot to tell you. Peter Venkman: What? Spengler: Don't cross the streams. Venkman: Why? Spengler: It would be bad. Venkman: I'm fuzzy on the whole good/bad thing. What do you mean, "bad"? Spengler: Try to imagine all life as you know it stopping instantaneously and every molecule in your body exploding at the speed of light. Ray Stantz: Total protonic reversal! Venkman: Right. That's bad. Okay. All right. Important safety tip. Thanks, Egon.
Is your Supplier<V>
is a HystrixCommand, and you have coded it’s fallback method to take a reference to the Cache object,
and perform a cache.get()
of the same key for which the cache apply
is running and executing the HystrixCommand, then you
are guaranteed failure.
For example, do not do this in a HystrixCommand for the same key
that the cache apply is running for.:
@Override
protected CacheableItemWithCreationDate<V> getFallback() {
CacheableItemWithCreationDate<V> contentObj = null;
try {
contentObj = cache.get(cacheKey).get();
} catch (Throwable e) {
e.printStackTrace();
}
return contentObj;
}
The reason for this is that if your HystrixComamnd’s execute is running as the implmentation of the Supplier<V>
interface.The your Hystrix Command’s run()
method via the hystrix execute() method. The HystrixCommand
(Supplier<V>
) is executing in a (asynchronous) Future<V> within herdcache.
This Future<V>
is stored in an internal map in herdcache as a thundering herd mechanism (https://github.com/tootedom/herdcache#overview),
under the key you are looking up.
The Future<V> exists in the internal map, keyed on the given key, until the Supplier<V>
command effectively returns a value.
For a HystrixCommand’s execute()
method, the resulting value is either from the commands run() method or its getFallback().
The problem here is that if you call in your command’s getFallback() the cache.get(key)
method you are still "effectively"
in the execute() method, and the Future<V>
is yet complete (it is still waiting to generate a value from run() or getFallback()).
The cache.get(cacheKey).get() will be waiting on exactly the same Future object that was created by herdcache when it
initially executed the Supplier<V>
(The HystrixCommand’s execute()
method). The cache.get(cacheKey).get(); checks
the internal map for an already executing Future that mapped to the given key:
In other words:
-
The Supplier<V> (HystrixCommand’s
run()
) is executing in a Future. Say Future X. This Future X is stored in a Map<String,Future> internally in herdcache, keyed on PID_XYZ -
If run() fails, this will result in the HystrixCommand’s getFallback() being called.
-
If getFallback() this invokes cache.get("PID_XYZ"). Then the future previously stored in the Map, keyed on "PID_XYZ", is returned from the Map.
-
This is the same Future that is executing run().
-
As a result what you effectively have is a loop.
This is technically an alternate implementation of the stale-while-revalidate pattern. This feature is available in
herdcache version 1.0.26
, and is a implemented with in the interface RevalidateInBackgroundCapableCache
. This interface
extends the CacheWithExpiry
interface with an additional parameter returnInvalidCachedItemWhileRevalidate
on
the apply(…)
method:
public ListenableFuture<V> apply(String key,
Supplier<V> computation,
Duration timeToLive,
ListeningExecutorService executorService,
Predicate<V> canCacheValueEvalutor,Predicate<V> isCachedValueValid,
boolean returnInvalidCachedItemWhileRevalidate);
public ListenableFuture<V> apply(String key,
Supplier<V> computation,
ListeningExecutorService executorService,
Predicate<V> canCacheValueEvalutor,Predicate<V> isCachedValueValid,
boolean returnInvalidCachedItemWhileRevalidate);
The flow of the apply method, when returnInvalidCachedItemWhileRevalidate
is true
, is as follows:
-
An item is found in the cache
-
The item is passed to the
Predicate<V>
isCachedValueValid -
If the
isCachedValueValid
predicate returns true then the cache value if returned. -
However, if the
isCachedValueValid
predicate returns false then the "invalid" value is returned (set on the Future<V>), while theSupplier<V>
computation is submitted to theexecutorService
in order to refresh the item in the cache.
The below is a simple java main class the can be run on the command line like the following. The below generates a random integer between 1 and 1000, and applys that value to the cache.
java -DmaxRand=1000 -Dmillis=500 -Dhosts=herdtesting.jgkygp.cfg.euw1.cache.amazonaws.com:11211 -jar herd-elastitest-0.1.0-SNAPSHOT-relocated-shade.jar
With the given logback.xml, you would have output on the console that will show if I can hit or not occurred:
71671 [pool-1-thread-1] DEBUG MemcachedCacheHitsLogger - { "cachehit" : "-778756949", "cachetype" : "distributed_cache"}
71671 [pool-1-thread-1] INFO ElastiCacheTest - Adding cache value : 633
71680 [pool-1-thread-1] DEBUG MemcachedCacheHitsLogger - { "cachehit" : "274176478", "cachetype" : "distributed_cache"}
71680 [pool-1-thread-1] INFO ElastiCacheTest - Adding cache value : 35
71690 [pool-1-thread-1] DEBUG MemcachedCacheHitsLogger - { "cachemiss" : "65783974", "cachetype" : "distributed_cache"}
71690 [pool-1-thread-1] DEBUG o.g.c.h.m.BaseMemcachedCache - set requested for 65783974
71691 [pool-1-thread-1] INFO ElastiCacheTest - Adding cache value : 107
package org.greencheek.caching.elasticache;
import com.google.common.util.concurrent.MoreExecutors;
import net.spy.memcached.ConnectionFactoryBuilder;
import org.greencheek.caching.herdcache.CacheWithExpiry;
import org.greencheek.caching.herdcache.memcached.ElastiCacheMemcachedCache;
import org.greencheek.caching.herdcache.memcached.config.builder.ElastiCacheCacheConfigBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
*
*/
public class ElastiCacheTest {
private static final ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
private static final Logger logger = LoggerFactory.getLogger("ElastiCacheTest");
private static final CacheWithExpiry cache = new ElastiCacheMemcachedCache<Integer>(
new ElastiCacheCacheConfigBuilder()
.setElastiCacheConfigHosts(System.getProperty("hosts","localhost:11211"))
.setConfigPollingTime(Duration.ofSeconds(Integer.getInteger("pollingTime",60)))
.setInitialConfigPollingDelay(Duration.ofSeconds(0))
.setTimeToLive(Duration.ofSeconds(10))
.setProtocol(ConnectionFactoryBuilder.Protocol.TEXT)
.setWaitForMemcachedSet(true)
.setDelayBeforeClientClose(Duration.ofSeconds(1))
.setDnsConnectionTimeout(Duration.ofSeconds(2))
.setUseStaleCache(true)
.setStaleCacheAdditionalTimeToLive(Duration.ofSeconds(4))
.setRemoveFutureFromInternalCacheBeforeSettingValue(true)
.buildElastiCacheMemcachedConfig());
public static void main(String[] args) {
service.scheduleAtFixedRate(()-> {
int i = randInt(Integer.getInteger("minRand",1),Integer.getInteger("maxRand",2));
logger.info("Adding cache value : {}",cache.awaitForFutureOrElse(
cache.apply(""+i,() -> { return i; },
MoreExecutors.sameThreadExecutor()
),
MoreExecutors.sameThreadExecutor()),"null");
}
,0,Integer.getInteger("millis",1000),TimeUnit.MILLISECONDS);
}
public static int randInt(int min,int max) {
// NOTE: Usually this should be a field rather than a method
// variable so that it is not re-seeded every call.
Random rand = new Random();
// nextInt is normally exclusive of the top value,
// so add 1 to make it inclusive
return rand.nextInt((max - min) + 1) + min;
}
}
<configuration scan="true" scanPeriod="120 seconds" >
<contextListener class="ch.qos.logback.classic.jul.LevelChangePropagator">
<resetJUL>true</resetJUL>
</contextListener>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
<pattern>%-4relative [%thread] %-5level %logger{35} - %msg %n</pattern>
</encoder>
</appender>
<logger name="net.spy" level="WARN"/>
<root level="DEBUG">
<appender-ref ref="STDOUT" />
</root>
</configuration>
To compile and run perf tests do:
mvn clean test-compile assembly:single
$JAVA_HOME/bin/java -jar target/performancetests-test-jar-with-dependencies.jar
Example output:
Benchmark Mode Cnt Score Error Units
PerfTestApplyCommand.applyDefaultKetamaHashAlgoTest thrpt 40 45.778 ± 4.248 ops/ms
PerfTestApplyCommand.applyDefaultKetamaHashAlgoTestLargeValue thrpt 40 38.663 ± 11.279 ops/ms
PerfTestApplyCommand.applyFolsomTest thrpt 40 37.213 ± 4.314 ops/ms
PerfTestApplyCommand.applyFolsomTestLargeValue thrpt 40 33.782 ± 6.222 ops/ms
PerfTestApplyCommand.applyJenkinsHashAlgoTest thrpt 40 49.804 ± 8.375 ops/ms
PerfTestApplyCommand.applyJenkinsHashAlgoTestLargeValue thrpt 40 43.057 ± 10.184 ops/ms
PerfTestApplyCommand.applyNative64XXHashAlgoTest thrpt 40 47.586 ± 5.329 ops/ms
PerfTestApplyCommand.applyNative64XXHashAlgoTestLargeValue thrpt 40 38.698 ± 8.471 ops/ms
PerfTestApplyCommand.applyNoKeyHashingJenkinsTest thrpt 40 56.266 ± 12.331 ops/ms
PerfTestApplyCommand.applyNoKeyHashingJenkinsTestLargeValue thrpt 40 60.013 ± 22.869 ops/ms
PerfTestApplyCommand.applySHA256HashingJenkinsTest thrpt 40 43.280 ± 1.106 ops/ms
PerfTestApplyCommand.applySHA256HashingJenkinsTestLargeValue thrpt 40 31.405 ± 6.456 ops/ms
PerfTestApplyCommand.applyXXHashAlgoTest thrpt 40 45.088 ± 3.099 ops/ms
PerfTestApplyCommand.applyXXHashAlgoTestLargeValue thrpt 40 34.139 ± 6.772 ops/ms
o.g.c.h.p.b.compression.SnappyPerfTest.iq80Compresss thrpt 40 72.708 ± 2.179 ops/ms
o.g.c.h.p.b.compression.SnappyPerfTest.iq80Decompresss thrpt 40 162.476 ± 3.815 ops/ms
o.g.c.h.p.b.compression.SnappyPerfTest.xerialCompress thrpt 40 105.303 ± 2.542 ops/ms
o.g.c.h.p.b.compression.SnappyPerfTest.xerialDecompress thrpt 40 173.413 ± 6.217 ops/ms
LZ4PerfTest.compress thrpt 40 138.716 ± 1.626 ops/ms
LZ4PerfTest.compressAndDecompress thrpt 40 98.266 ± 1.031 ops/ms
SnappyCompressionPerfTest.compress thrpt 40 109.232 ± 4.480 ops/ms
SnappyCompressionPerfTest.compressAndDecompress thrpt 40 72.289 ± 0.820 ops/ms