概述
前面的两篇文章(Guava Cache系列之一:如何加载缓存 和 Guava Cache系列之二:如何回收缓存)介绍了Guava Cache的使用,下面从源码来看一下Guava Cache的设计和实现
源码分析
构造函数
LocalCache(
CacheBuilder<? super K, ? super V> builder, @Nullable CacheLoader<? super K, V> loader) {
//并发程度,根据我们传的参数和默认最大值中选取小者。
//如果没有指定该参数的情况下,CacheBuilder将其置为UNSET_INT即为-1
//getConcurrencyLevel方法获取时,如果为-1就返回默认值4
//否则返回用户传入的参数
concurrencyLevel = Math.min(builder.getConcurrencyLevel(), MAX_SEGMENTS);
//键值的引用类型,没有指定的话,默认为强引用类型
keyStrength = builder.getKeyStrength();
valueStrength = builder.getValueStrength();
//判断相同的方法,强引用类型就是Equivalence.equals()
keyEquivalence = builder.getKeyEquivalence();
valueEquivalence = builder.getValueEquivalence();
maxWeight = builder.getMaximumWeight();
weigher = builder.getWeigher();
expireAfterAccessNanos = builder.getExpireAfterAccessNanos();
expireAfterWriteNanos = builder.getExpireAfterWriteNanos();
refreshNanos = builder.getRefreshNanos();
//移除消息监听器
removalListener = builder.getRemovalListener();
//如果我们指定了移除消息监听器的话,会创建一个队列,临时保存移除的内容
removalNotificationQueue = (removalListener == NullListener.INSTANCE)
? LocalCache.<RemovalNotification<K, V>>discardingQueue()
: new ConcurrentLinkedQueue<RemovalNotification<K, V>>();
ticker = builder.getTicker(recordsTime());
//创建新的缓存内容(entry)的工厂,会根据引用类型选择对应的工厂
entryFactory = EntryFactory.getFactory(keyStrength, usesAccessEntries(), usesWriteEntries());
globalStatsCounter = builder.getStatsCounterSupplier().get();
defaultLoader = loader;
//初始化缓存容量,默认为16
int initialCapacity = Math.min(builder.getInitialCapacity(), MAXIMUM_CAPACITY);
if (evictsBySize() && !customWeigher()) {
initialCapacity = Math.min(initialCapacity, (int) maxWeight);
}
// Find the lowest power-of-two segmentCount that exceeds concurrencyLevel, unless
// maximumSize/Weight is specified in which case ensure that each segment gets at least 10
// entries. The special casing for size-based eviction is only necessary because that eviction
// happens per segment instead of globally, so too many segments compared to the maximum size
// will result in random eviction behavior.
int segmentShift = 0;
int segmentCount = 1;
//根据并发程度来计算segement数组的大小(大于等于concurrencyLevel的最小的2的幂,这里即为4)
while (segmentCount < concurrencyLevel
&& (!evictsBySize() || segmentCount * 20 <= maxWeight)) {
++segmentShift;
segmentCount <<= 1;
}
//这里的segmentShift和segmentMask用来打散entry,让缓存内容尽量均匀分布在每个segment下
this.segmentShift = 32 - segmentShift;
segmentMask = segmentCount - 1;
//这里进行初始化segment数组,大小即为4
this.segments = newSegmentArray(segmentCount);
//每个segment的容量,总容量/segment的大小,向上取整,这里就是16/4=4
int segmentCapacity = initialCapacity / segmentCount;
if (segmentCapacity * segmentCount < initialCapacity) {
++segmentCapacity;
}
//这里计算每个Segment[i]下的table的大小
int segmentSize = 1;
//SegmentSize为小于segmentCapacity的最大的2的幂,这里为4
while (segmentSize < segmentCapacity) {
segmentSize <<= 1;
}
//初始化每个segment[i]
//注:根据权重的方法使用较少,这里走else分支
if (evictsBySize()) {
// Ensure sum of segment max weights = overall max weights
long maxSegmentWeight = maxWeight / segmentCount + 1;
long remainder = maxWeight % segmentCount;
for (int i = 0; i < this.segments.length; ++i) {
if (i == remainder) {
maxSegmentWeight--;
}
this.segments[i] =
createSegment(segmentSize, maxSegmentWeight, builder.getStatsCounterSupplier().get());
}
} else {
for (int i = 0; i < this.segments.length; ++i) {
this.segments[i] =
createSegment(segmentSize, UNSET_INT, builder.getStatsCounterSupplier().get());
}
}
}
成员变量简介如下:
static final int MAXIMUM_CAPACITY = 1 << 30:缓存最大容量,该数值必须是2的幂,同时小于这个最大值2^30
static final int MAX_SEGMENTS = 1 << 16:Segment数组最大容量
static final int CONTAINS_VALUE_RETRIES = 3:containsValue方法的重试次数
static final int DRAIN_THRESHOLD = 0x3F(63):Number of cache access operations that can be buffered per segment before the cache's recency ordering information is updated. This is used to avoid lock contention by recording a memento of reads and delaying a lock acquisition until the threshold is crossed or a mutation occurs.
static final int DRAIN_MAX = 16:一次清理操作中,最大移除的entry数量
final int segmentMask:定位segment
final int segmentShift:定位segment,同时让entry分布均匀,尽量平均分布在每个segment[i]中
final Segment<K, V>[] segments:segment数组,每个元素下都是一个HashTable
final int concurrencyLevel:并发程度,用来计算segment数组的大小。segment数组的大小正决定了并发的程度
final Equivalence<Object> keyEquivalence:key比较方式
final Equivalence<Object> valueEquivalence:value比较方式
final Strength keyStrength:key引用类型
final Strength valueStrength:value引用类型
final long maxWeight:最大权重
final Weigher<K, V> weigher:计算每个entry权重的接口
final long expireAfterAccessNanos:一个entry访问后多久过期
final long expireAfterWriteNanos:一个entry写入后多久过期
final long refreshNanos:一个entry写入多久后进行刷新
final Queue<RemovalNotification<K, V>> removalNotificationQueue:移除监听器使用队列
final RemovalListener<K, V> removalListener:entry过期移除或者gc回收(弱引用和软引用)将会通知的监听器
final Ticker ticker:统计时间
final EntryFactory entryFactory:创建entry的工厂
final StatsCounter globalStatsCounter:全局缓存性能统计器(命中、未命中、put成功、失败次数等)
final CacheLoader<? super K, V> defaultLoader:默认的缓存加载器
通过源码可以看出,Guava Cache的初始化和使用到的数据结构和
ConcurrentHashMap
是很相似的。具体可以参考ConcurrentHashMap
get
核心代码逻辑在
com.google.common.cache.LocalCache.Segment#get(K, int, com.google.common.cache.CacheLoader<? super K,V>)
V get(K key, int hash, CacheLoader<? super K, V> loader) throws ExecutionException {
checkNotNull(key);
checkNotNull(loader);
try {
if (count != 0) { // read-volatile
// don't call getLiveEntry, which would ignore loading values
ReferenceEntry<K, V> e = getEntry(key, hash);
if (e != null) {
long now = map.ticker.read();
V value = getLiveValue(e, now);
if (value != null) {
recordRead(e, now);
statsCounter.recordHits(1);
return scheduleRefresh(e, key, hash, value, now, loader);
}
ValueReference<K, V> valueReference = e.getValueReference();
if (valueReference.isLoading()) {
return waitForLoadingValue(e, key, valueReference);
}
}
}
// at this point e is either null or expired;
return lockedGetOrLoad(key, hash, loader);
} catch (ExecutionException ee) {
Throwable cause = ee.getCause();
if (cause instanceof Error) {
throw new ExecutionError((Error) cause);
} else if (cause instanceof RuntimeException) {
throw new UncheckedExecutionException(cause);
}
throw ee;
} finally {
postReadCleanup();
}
}
核心代码在
com.google.common.cache.LocalCache.Segment#lockedGetOrLoad
V lockedGetOrLoad(K key, int hash, CacheLoader<? super K, V> loader)
throws ExecutionException {
ReferenceEntry<K, V> e;
ValueReference<K, V> valueReference = null;
LoadingValueReference<K, V> loadingValueReference = null;
boolean createNewEntry = true;
//加锁
lock();
try {
// re-read ticker once inside the lock
long now = map.ticker.read();
preWriteCleanup(now);
int newCount = this.count - 1;
//当前segment下的HashTable
AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
//这里也是为什么table的大小要为2的幂(最后index范围刚好在0-table.length()-1)
int index = hash & (table.length() - 1);
ReferenceEntry<K, V> first = table.get(index);
//在链表上查找
for (e = first; e != null; e = e.getNext()) {
K entryKey = e.getKey();
if (e.getHash() == hash && entryKey != null
&& map.keyEquivalence.equivalent(key, entryKey)) {
valueReference = e.getValueReference();
//如果正在载入中,就不需要创建,只需要等待载入完成读取即可
if (valueReference.isLoading()) {
createNewEntry = false;
} else {
V value = valueReference.get();
// 被gc回收(在弱引用和软引用的情况下会发生)
if (value == null) {
enqueueNotification(entryKey, hash, valueReference, RemovalCause.COLLECTED);
} else if (map.isExpired(e, now)) {
// 过期
enqueueNotification(entryKey, hash, valueReference, RemovalCause.EXPIRED);
} else {
//存在并且没有过期,更新访问队列并记录命中信息,返回value
recordLockedRead(e, now);
statsCounter.recordHits(1);
// we were concurrent with loading; don't consider refresh
return value;
}
// 对于被gc回收和过期的情况,从写队列和访问队列中移除
// 因为在后面重新载入后,会再次添加到队列中
writeQueue.remove(e);
accessQueue.remove(e);
this.count = newCount; // write-volatile
}
break;
}
}
if (createNewEntry) {
//先创建一个loadingValueReference,表示正在载入
loadingValueReference = new LoadingValueReference<K, V>();
if (e == null) {
//如果当前链表为空,先创建一个头结点
e = newEntry(key, hash, first);
e.setValueReference(loadingValueReference);
table.set(index, e);
} else {
e.setValueReference(loadingValueReference);
}
}
} finally {
//解锁
unlock();
//执行清理
postWriteCleanup();
}
if (createNewEntry) {
try {
// Synchronizes on the entry to allow failing fast when a recursive load is
// detected. This may be circumvented when an entry is copied, but will fail fast most
// of the time.
synchronized (e) {
//异步加载
return loadSync(key, hash, loadingValueReference, loader);
}
} finally {
//记录未命中
statsCounter.recordMisses(1);
}
} else {
// 等待加载进来然后读取即可
return waitForLoadingValue(e, key, valueReference);
}
}