缓存过期算法
- LFU(Least Frequently Used,最近最不常用)
根据最近使用的频率来淘汰频率低的数据。核心思想是:如果数据过去多次被访问,那么将来被访问的概率也高。该算法需要维护数据的访问频率,开销很大。此外不适合应对突发性热点访问场景。 - LRU(Least recently used,最近最少使用)
根据最近访问记录淘汰最近很少被访问的数据。核心思想是:如果数据最近被访问过,那么将来被访问的几率也更高。缺点在于偶发性的批量操作会降低命中率。guavacache使用的就是此算法。 -
TinyLFU
轻量LFU算法,相比较LFU,使用更少的内存来保存访问频率。TinyLFU保存了近期的访问频率,而不是整个生命周期的访问频率,所以可以很好的应对突发性热点场景。这些访问记录会作为一个过滤频率,当新加入的数据访问频率比要淘汰的数据访问频率才加入缓存。
TinyLFU 通过 Count-Min Sketch 算法来记录频率信息,它占用空间小且误报率低。但在应对稀疏的突发性访问量大的场景,将很难保存这类元素,因为可能无法在短时间内积累足够的访问频率,从而被过滤器过滤掉。
-
W-TinyLFU
对TinyLFU在稀疏突发性访问大的场景做了优化,W-TinyLFU 将新记录暂时放入 Window Cache 里面,只有通过 TinLFU 考察才能进入 Main Cache。
其中过滤器使用了Count-Min Sketch算法(一种布隆过滤器的变种)实现,即根据不同的hash算法创建不同的数组,针对每一个数据进行多次hash,并在该hash算法的对应数组hash索引位置上+1,由于hash算法存在冲突,那么在最后取计数的时候,取所有数组中最小的值即可。
Caffeine简介
相对于guavacache,caffeine采用更好的过期策略W-TinyLFU,并通过RingBuffer缓存访问信息,再批量异步处理;此外caffeine底层直接使用jdk1.8的ConcurrentHashMap(因此caffeine只能在1.8上使用),比1.7的ConcurrentHashMap要快不少(guavacache底层实现类似于1.7的ConcurrentHashMap),官方号称其性能接近jdk1.8的ConcurrentHashMap。下面是官方的测试对比图:
caffeine使用和guava差不多,因此切换过来成本较低。
读写数据结构
不同于guavacache采用accessQueue、recencyQueue、writeQueue队列来记录读写操作。caffeine采用的是readBuffer和writeBuffer。
readBuffer
采用多个 RingBuffer(striped ring buffer 条带环形缓冲,有损),通过线程 id 哈希到对应的RingBuffer。当一个RingBuffer满了后,后续的写入会丢弃直到这个RingBuffer可用。
当读命中后,会将数据写入RingBuffer,这个写入操作性能很高。然后由线程池(默认forkjoin或自定义)异步消费RingBuffer,将数据加入到数据淘汰队列中。
writeBuffer
MpscGrowableArrayQueue实现,和JCTools中的原理差不多。写和读操作不一样,读多写少,且不允许有损(丢失数据),mpsc可参考【netty学习笔记十七】Mpsc高性能无锁队列。
自定义过期策略
caffeine除了支持expireAfterAccess和expireAfterWrite,还支持expireAfter,即根据key定义不同的过期时间。这里的实现是用时间轮,可参考【netty学习笔记十八】netty时间轮。
源码简析
直接看get方法,最终会调computeIfAbsent方法
public @Nullable V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction,
boolean recordStats, boolean recordLoad) {
long now = expirationTicker().read();
// 这里的data就是ConcurrentHashMap
Node<K, V> node = data.get(nodeFactory.newLookupKey(key));
if (node != null) {
V value = node.getValue();
//如果key已存在且没过期(这里过期都是exprire开头的属性),则返回value
if ((value != null) && !hasExpired(node, now)) {
if (!isComputingAsync(node)) {
tryExpireAfterRead(node, key, value, expiry(), now);
setAccessTime(node, now);
}
afterRead(node, now, /* recordHit */ recordStats);
return value;
}
}
if (recordStats) {
mappingFunction = statsAware(mappingFunction, recordLoad);
}
Object keyRef = nodeFactory.newReferenceKey(key, keyReferenceQueue());
return doComputeIfAbsent(key, keyRef, mappingFunction, new long[] { now }, recordStats);
}
/** Returns the current value from a computeIfAbsent invocation. */
@Nullable V doComputeIfAbsent(K key, Object keyRef,
Function<? super K, ? extends V> mappingFunction, long[/* 1 */] now, boolean recordStats) {
@SuppressWarnings("unchecked")
V[] oldValue = (V[]) new Object[1];
@SuppressWarnings("unchecked")
V[] newValue = (V[]) new Object[1];
@SuppressWarnings("unchecked")
K[] nodeKey = (K[]) new Object[1];
@SuppressWarnings({"unchecked", "rawtypes"})
Node<K, V>[] removed = new Node[1];
int[] weight = new int[2]; // old, new
RemovalCause[] cause = new RemovalCause[1];
Node<K, V> node = data.compute(keyRef, (k, n) -> {
// n!=null表示数据是过期的
if (n == null) {
// 这里最终调用自定义加载数据方法那里
newValue[0] = mappingFunction.apply(key);
if (newValue[0] == null) {
return null;
}
now[0] = expirationTicker().read();
weight[1] = weigher.weigh(key, newValue[0]);
n = nodeFactory.newNode(key, keyReferenceQueue(),
newValue[0], valueReferenceQueue(), weight[1], now[0]);
setVariableTime(n, expireAfterCreate(key, newValue[0], expiry(), now[0]));
return n;
}
// 对节点加锁,若n过期了, 其他线程会等待n更新完毕
synchronized (n) {
nodeKey[0] = n.getKey();
weight[0] = n.getWeight();
oldValue[0] = n.getValue();
if ((nodeKey[0] == null) || (oldValue[0] == null)) {
cause[0] = RemovalCause.COLLECTED;
} else if (hasExpired(n, now[0])) {
cause[0] = RemovalCause.EXPIRED;
} else {
return n;
}
//删除数据并重新加载
writer.delete(nodeKey[0], oldValue[0], cause[0]);
newValue[0] = mappingFunction.apply(key);
if (newValue[0] == null) {
removed[0] = n;
n.retire();
return null;
}
weight[1] = weigher.weigh(key, newValue[0]);
n.setValue(newValue[0], valueReferenceQueue());
n.setWeight(weight[1]);
now[0] = expirationTicker().read();
setVariableTime(n, expireAfterCreate(key, newValue[0], expiry(), now[0]));
setAccessTime(n, now[0]);
setWriteTime(n, now[0]);
return n;
}
});
if (node == null) {
if (removed[0] != null) {
afterWrite(new RemovalTask(removed[0]));
}
return null;
}
if (cause[0] != null) {
// 旧值移除通知
if (hasRemovalListener()) {
notifyRemoval(nodeKey[0], oldValue[0], cause[0]);
}
statsCounter().recordEviction(weight[0]);
}
// 新值还未计算完?
if (newValue[0] == null) {
if (!isComputingAsync(node)) {
tryExpireAfterRead(node, key, oldValue[0], expiry(), now[0]);
setAccessTime(node, now[0]);
}
afterRead(node, now[0], /* recordHit */ recordStats);
return oldValue[0];
}
//记录写操作
if ((oldValue[0] == null) && (cause[0] == null)) {
afterWrite(new AddTask(node, weight[1]));
} else {
int weightedDifference = (weight[1] - weight[0]);
afterWrite(new UpdateTask(node, weightedDifference));
}
//返回新值
return newValue[0];
}
refreshAfterWrite和expireAfterWrite区别
和guava有点不一样,refreshAfterWrite是过期了直接返回旧值,然后通过异步线程池进行刷新(默认线程池为forkjoin)。而expireAfterWrite是一样的,都是加载新值,其他线程需要等待。
refreshAfterWrite在BoundedLocalCache#afterRead -> refreshIfNeeded方法:
void refreshIfNeeded(Node<K, V> node, long now) {
if (!refreshAfterWrite()) {
return;
}
K key;
V oldValue;
long oldWriteTime = node.getWriteTime();
long refreshWriteTime = (now + ASYNC_EXPIRY);
if (((now - oldWriteTime) > refreshAfterWriteNanos())
&& ((key = node.getKey()) != null) && ((oldValue = node.getValue()) != null)
&& node.casWriteTime(oldWriteTime, refreshWriteTime)) {
try {
CompletableFuture<V> refreshFuture;
long startTime = statsTicker().read();
//是否异步
if (isAsync) {
@SuppressWarnings("unchecked")
CompletableFuture<V> future = (CompletableFuture<V>) oldValue;
if (Async.isReady(future)) {
@SuppressWarnings("NullAway")
CompletableFuture<V> refresh = future.thenCompose(value ->
cacheLoader.asyncReload(key, value, executor));
refreshFuture = refresh;
} else {
// no-op if load is pending
node.casWriteTime(refreshWriteTime, oldWriteTime);
return;
}
} else {
@SuppressWarnings("NullAway")
// 异常加载新值
CompletableFuture<V> refresh = cacheLoader.asyncReload(key, oldValue, executor);
refreshFuture = refresh;
}
refreshFuture.whenComplete((newValue, error) -> {
long loadTime = statsTicker().read() - startTime;
if (error != null) {
logger.log(Level.WARNING, "Exception thrown during refresh", error);
node.casWriteTime(refreshWriteTime, oldWriteTime);
statsCounter().recordLoadFailure(loadTime);
return;
}
@SuppressWarnings("unchecked")
V value = (isAsync && (newValue != null)) ? (V) refreshFuture : newValue;
boolean[] discard = new boolean[1];
compute(key, (k, currentValue) -> {
if (currentValue == null) {
return value;
} else if ((currentValue == oldValue) && (node.getWriteTime() == refreshWriteTime)) {
return value;
}
discard[0] = true;
return currentValue;
}, /* recordMiss */ false, /* recordLoad */ false, /* recordLoadFailure */ true);
if (discard[0] && hasRemovalListener()) {
notifyRemoval(key, value, RemovalCause.REPLACED);
}
if (newValue == null) {
statsCounter().recordLoadFailure(loadTime);
} else {
statsCounter().recordLoadSuccess(loadTime);
}
});
} catch (Throwable t) {
node.casWriteTime(refreshWriteTime, oldWriteTime);
logger.log(Level.SEVERE, "Exception thrown when submitting refresh task", t);
}
}
}
refreshAfterWrite异步加载过程如上,expireAfterWrite会在hasExpired就会判断,如果过期就调用doComputeIfAbsent加载新值。
boolean hasExpired(Node<K, V> node, long now) {
return (expiresAfterAccess() && (now - node.getAccessTime() >= expiresAfterAccessNanos()))
| (expiresAfterWrite() && (now - node.getWriteTime() >= expiresAfterWriteNanos()))
| (expiresVariable() && (now - node.getVariableTime() >= 0));
}
Count–Min Sketch算法实现
上文写到caffeine采用W-TinyLFU实现淘汰策略,其中过滤器这块使用了Count–Min Sketch算法。caffeine这块的实现在FrequencySketch类中,在实现中做了一些优化。
FrequencySketch使用一个long数组来记录访问频率,数组大小为最接近缓存大小且比其大的2的幂数。在FrequencySketch中,认为最大的访问频率是15,换成二进制则是4位,那一个long理论上可以放16种算法。但caffeine将long分为16等份,每份4bit用来存储对应的频率。long结构如下:
我们再看看添加一次访问频率的代码:
public void increment(@NonNull E e) {
if (isNotInitialized()) {
return;
}
// 和jdk的hashmap一样,这里的spread操作会让hash更均匀
int hash = spread(e.hashCode());
// 获取一个小于16的数,即判断在long里那个等份
int start = (hash & 3) << 2;
// 设计了4种hash方法(对应不同的种子),分别计算4个不同的table(long数组)下标
int index0 = indexOf(hash, 0);
int index1 = indexOf(hash, 1);
int index2 = indexOf(hash, 2);
int index3 = indexOf(hash, 3);
// 将table[index]、table[index+1]、table[index+2]、table[index+3]对应的等分追加1
boolean added = incrementAt(index0, start);
added |= incrementAt(index1, start + 1);
added |= incrementAt(index2, start + 2);
added |= incrementAt(index3, start + 3);
//处理频率很高但不经常使用的数据
if (added && (++size == sampleSize)) {
reset();
}
}
boolean incrementAt(int i, int j) {
//j为16等份的下标,offset即为64等份的下标
int offset = j << 2;
// mask用来判断是否等于15
long mask = (0xfL << offset);
if ((table[i] & mask) != mask) {
// 不等于15则追加1
table[i] += (1L << offset);
return true;
}
return false;
}
数据保新
如果有些数据频率很高,但不经常使用怎么办,总不能一直放在long数组中吧。Caffeine有个机制,当所有数据的统计频率数达到某一个阈值(默认为maximum的10倍),则对所有数的频率减半。
if (added && (++size == sampleSize)) {
reset();
}
/** Reduces every counter by half of its original value. */
void reset() {
int count = 0;
for (int i = 0; i < table.length; i++) {
count += Long.bitCount(table[i] & ONE_MASK);
table[i] = (table[i] >>> 1) & RESET_MASK;
}
size = (size >>> 1) - (count >>> 2);
}
Window
上面提到TinyLFU在面对突发性的稀疏流量时表现很差,新数据很难积累到足够多的频率来通过过滤器。而caffeine在此基础上做了优化,引入Window Tiny LFU(W-TinyLFU)。
当window区数据满了,就会根据LRU把数据candidate放入probation区,如果probation也满了,则跟probation数据进行pk,输的被淘汰。
caffeine默认配置为window容量占1%,剩余的80%为Protected,20%为probation(实验测试这样配置效果最好),实际运行时会动态调整。
驱逐代码如下:
/** Evicts entries if the cache exceeds the maximum. */
@GuardedBy("evictionLock")
void evictEntries() {
if (!evicts()) {
return;
}
//淘汰window区的记录
int candidates = evictFromWindow();
//淘汰main区的记录
evictFromMain(candidates);
}
/**
* Evicts entries from the window space into the main space while the window size exceeds a
* maximum.
*
* @return the number of candidate entries evicted from the window space
*/
@GuardedBy("evictionLock")
int evictFromWindow() {
int candidates = 0;
//获取window queue的头部节点
Node<K, V> node = accessOrderWindowDeque().peek();
//若超过window最大限制,则处理
while (windowWeightedSize() > windowMaximum()) {
// The pending operations will adjust the size to reflect the correct weight
if (node == null) {
break;
}
Node<K, V> next = node.getNextInAccessOrder();
if (node.getWeight() != 0) {
node.makeMainProbation();
//从window区移除并加入Probation
accessOrderWindowDeque().remove(node);
accessOrderProbationDeque().add(node);
candidates++;
//调整size
setWindowWeightedSize(windowWeightedSize() - node.getPolicyWeight());
}
node = next;
}
return candidates;
}
@GuardedBy("evictionLock")
void evictFromMain(int candidates) {
int victimQueue = PROBATION;
//分别获取第一个和最后个
Node<K, V> victim = accessOrderProbationDeque().peekFirst();
Node<K, V> candidate = accessOrderProbationDeque().peekLast();
// 当cache容量不够时处理
while (weightedSize() > maximum()) {
// Stop trying to evict candidates and always prefer the victim
if (candidates == 0) {
candidate = null;
}
// Try evicting from the protected and window queues
// 尝试从protected和window区获取victim数据
if ((candidate == null) && (victim == null)) {
if (victimQueue == PROBATION) {
victim = accessOrderProtectedDeque().peekFirst();
victimQueue = PROTECTED;
continue;
} else if (victimQueue == PROTECTED) {
victim = accessOrderWindowDeque().peekFirst();
victimQueue = WINDOW;
continue;
}
// The pending operations will adjust the size to reflect the correct weight
break;
}
// Skip over entries with zero weight
if ((victim != null) && (victim.getPolicyWeight() == 0)) {
victim = victim.getNextInAccessOrder();
continue;
} else if ((candidate != null) && (candidate.getPolicyWeight() == 0)) {
candidate = candidate.getPreviousInAccessOrder();
candidates--;
continue;
}
// Evict immediately if only one of the entries is present
if (victim == null) {
@SuppressWarnings("NullAway")
Node<K, V> previous = candidate.getPreviousInAccessOrder();
Node<K, V> evict = candidate;
candidate = previous;
candidates--;
evictEntry(evict, RemovalCause.SIZE, 0L);
continue;
} else if (candidate == null) {
Node<K, V> evict = victim;
victim = victim.getNextInAccessOrder();
evictEntry(evict, RemovalCause.SIZE, 0L);
continue;
}
// Evict immediately if an entry was collected
K victimKey = victim.getKey();
K candidateKey = candidate.getKey();
if (victimKey == null) {
@NonNull Node<K, V> evict = victim;
victim = victim.getNextInAccessOrder();
evictEntry(evict, RemovalCause.COLLECTED, 0L);
continue;
} else if (candidateKey == null) {
candidates--;
@NonNull Node<K, V> evict = candidate;
candidate = candidate.getPreviousInAccessOrder();
evictEntry(evict, RemovalCause.COLLECTED, 0L);
continue;
}
// Evict immediately if the candidate's weight exceeds the maximum
// weight太大的节点直接驱逐
if (candidate.getPolicyWeight() > maximum()) {
candidates--;
Node<K, V> evict = candidate;
candidate = candidate.getPreviousInAccessOrder();
evictEntry(evict, RemovalCause.SIZE, 0L);
continue;
}
// Evict the entry with the lowest frequency
candidates--;
// admit根据频率记录来比较
if (admit(candidateKey, victimKey)) {
Node<K, V> evict = victim;
victim = victim.getNextInAccessOrder();
evictEntry(evict, RemovalCause.SIZE, 0L);
candidate = candidate.getPreviousInAccessOrder();
} else {
Node<K, V> evict = candidate;
candidate = candidate.getPreviousInAccessOrder();
evictEntry(evict, RemovalCause.SIZE, 0L);
}
}
}
//频率比较
@GuardedBy("evictionLock")
boolean admit(K candidateKey, K victimKey) {
int victimFreq = frequencySketch().frequency(victimKey);
int candidateFreq = frequencySketch().frequency(candidateKey);
//谁大谁赢
if (candidateFreq > victimFreq) {
return true;
//相等,则candidateFreq <= 5算输。这里有考虑衰减的情况,怕candidate利用这个规律淘汰老数据,主要是提高命中率。
} else if (candidateFreq <= 5) {
// The maximum frequency is 15 and halved to 7 after a reset to age the history. An attack
// exploits that a hot candidate is rejected in favor of a hot victim. The threshold of a warm
// candidate reduces the number of random acceptances to minimize the impact on the hit rate.
return false;
}
// 相等且candidateFreq>5,则随机淘汰一个
int random = ThreadLocalRandom.current().nextInt();
return ((random & 127) == 0);
}
高性能读写操作
缓存对数据进行处理(读、写)后,都会伴随一些额外的操作,如:
- 判断数据是否过期;
- 统计频率;
- 记录读写
- 统计命中率等
在guava中,读写操作和这些额外的操作一起进行。caffeine借鉴了WAL思想,执行读写操作后,将操作记录记载缓冲区,后面再异步处理,提高了性能。
ReadBuffer
每次读命中后,会执行afterRead:
void afterRead(Node<K, V> node, long now, boolean recordHit) {
if (recordHit) {
statsCounter().recordHits(1);
}
// 将数据加入ReadBuffer
boolean delayable = skipReadBuffer() || (readBuffer.offer(node) != Buffer.FULL);
if (shouldDrainBuffers(delayable)) {
scheduleDrainBuffers();
}
refreshIfNeeded(node, now);
}
// 首先会进入BoundedBuffer的父类StripedBuffer
public int offer(E e) {
int mask;
int result = 0;
Buffer<E> buffer;
boolean uncontended = true;
Buffer<E>[] buffers = table;
if ((buffers == null)
|| (mask = buffers.length - 1) < 0
//根据线程id判断在哪个buffer
|| (buffer = buffers[getProbe() & mask]) == null
|| !(uncontended = ((result = buffer.offer(e)) != Buffer.FAILED))) {
expandOrRetry(e, uncontended);
}
return result;
}
public int offer(E e) {
long head = readCounter;
long tail = relaxedWriteCounter();
//获取buffer大小,超过则加入失败,即允许有数据丢失
long size = (tail - head);
if (size >= SPACED_SIZE) {
return Buffer.FULL;
}
// cas操作追加16
if (casWriteCounter(tail, tail + OFFSET)) {
//求余获取下标
int index = (int) (tail & SPACED_MASK);
buffer.lazySet(index, e);
return Buffer.SUCCESS;
}
return Buffer.FAILED;
}
这里要注意下,ringbuffer默认为16,而其数组大小是256,这里是假设引用大小为4字节(ringbuffer存的是引用),缓存行大小为64。所以这里每个缓存行只存一个数据,所以cas操作追加16,即数组中每16个元素只有一个有效存储,空间换时间。
参考
https://albenw.github.io/posts/a4ae1aa2/
https://zhouxinghang.github.io/caffeine.html