// tickMs 当前时间轮中一个时间格表示的时间跨度
// wheelSize 当前时间轮的格数
// startMs 时间轮的创建时间
// taskCounter 所有时间轮中任务的总数
// queue 所有时间轮共用的一个任务队列,元素类型是TimerTaskList
private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) {
// 当前时间轮的时间跨度
// 当前时间轮只能处理时间范围在 currentTime~currentTime+interval之间的定时任务,
// 超过范围要把任务添加到上层时间轮中
private[this] val interval = tickMs * wheelSize
// 每个成员对应时间轮里的一个时间格,保存TimerTaskList的数组
private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) }
// currentTime是时间轮的指针,tickMs的整数倍(即只可能按时间格步长前进),将整个时间轮分为到期部分和未到期部分
// currentTime当前指向的时间格也属于到期部分
// 初始化时近似等同于创建时间
private[this] var currentTime = startMs - (startMs % tickMs)
// 上层时间轮的引用
@volatile private[this] var overflowWheel: TimingWheel = null
def add(timerTaskEntry: TimerTaskEntry): Boolean = {
val expiration = timerTaskEntry.expirationMs
if (timerTaskEntry.cancelled) {
false
} else if (expiration < currentTime + tickMs) {
// 任务已经过期了,即使是在当前指针指向的时间格也算过期
false
} else if (expiration < currentTime + interval) { // 在这个时间轮跨度内.添加到这个时间轮里
// 根据任务的失效时间分配时间格
val virtualId = expiration / tickMs
val bucket = buckets((virtualId % wheelSize.toLong).toInt)
bucket.add(timerTaskEntry)
// 随着currentTime后移,当前时间轮能处理的时间段也在不断后移,
// 新来的TimerTaskEntity会添加到复用原来清理过的时间格
// 所以每次重置bucket的到期时间,保证最新
if (bucket.setExpiration(virtualId * tickMs)) {
queue.offer(bucket)
}
true
} else {
// 超过时间轮跨度,添加到上层
if (overflowWheel == null) addOverflowWheel()
overflowWheel.add(timerTaskEntry)
}
}
// TimingWheel:尝试推进当前和上层时间轮的指针
def advanceClock(timeMs: Long): Unit = {
if (timeMs >= currentTime + tickMs) {
// 修整currentTime是tickMs的整数倍, 即减去整除后多余的余数
// 指针的前进并不是想象中的固定步长,而是直接跳到对应任务的超时时间
currentTime = timeMs - (timeMs % tickMs)
if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
}
}
}
private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {
private[this] val expiration = new AtomicLong(-1L)
def setExpiration(expirationMs: Long): Boolean = {
// 这里判断新添加任务的expiration和原来的是否一致,保证幂等
expiration.getAndSet(expirationMs) != expirationMs
}
}
// 执行到期任务、阻塞等待最近到期任务
@threadsafe
class SystemTimer(executorName: String, tickMs: Long = 1, wheelSize: Int = 20,
startMs: Long = System.currentTimeMillis) extends Timer {
// 固定线程池,执行到期任务
private[this] val taskExecutor = Executors.newFixedThreadPool(1, new ThreadFactory() {
def newThread(runnable: Runnable): Thread =
Utils.newThread("executor-"+executorName, runnable, false)
})
// 所有时间轮共用队列
private[this] val delayQueue = new DelayQueue[TimerTaskList]()
private[this] val taskCounter = new AtomicInteger(0)
// 最底层的时间轮
private[this] val timingWheel = new TimingWheel(
tickMs = tickMs, wheelSize = wheelSize, startMs = startMs, taskCounter = taskCounter, delayQueue
)
// 同步时间轮currentTime修改的读写锁
private[this] val readWriteLock = new ReentrantReadWriteLock()
// DelayedOperationPurgatory.tryCompleteElseWatch里如果未到时间的operation会触发add
// DelayedOperationPurgatory是TimerTask的子类
// 实质就是加锁版的addTimerTaskEntry
def add(timerTask: TimerTask): Unit = {
readLock.lock()
try {
addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + System.currentTimeMillis()))
} finally {
readLock.unlock()
}
}
// 添加定时任务,未过期就加入时间轮,否则就执行
private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
if (!timingWheel.add(timerTaskEntry)) {
// 时间轮添加任务返回false说明已经过期,直接执行该任务
if (!timerTaskEntry.cancelled)
taskExecutor.submit(timerTaskEntry.timerTask)
}
}
// SystenTimer.advanClock
def advanceClock(timeoutMs: Long): Boolean = {
// 取出的是TimerTaskList类型成员
// 当TimerTaskList因为超时被轮询出来并不一定代表里面所有的TimerTaskEntry一定就超时,
// 所以对于没有超时的TimerTaskEntry需要重新加入到TimingWheel新的TimerTaskList中,对于超时的TimerTaskEntry则立即执行任务。
var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
if (bucket != null) {
// 定时队列能取出任务说明任务已到期
writeLock.lock()
try {
while (bucket != null) {
// 从最底层的时间轮开始推进指针
timingWheel.advanceClock(bucket.getExpiration())
// 从队列里取出的是TimerTaskList,然后遍历List,每条Entity过期就执行,未过期就重新从底层时间轮开始插入
// 不就重复插入了吗?在哪里清空时间格的??
bucket.flush(reinsert)
// 此处poll不会阻塞
bucket = delayQueue.poll()
}
} finally {
writeLock.unlock()
}
true
} else {
false
}
}
}
trait TimerTask extends Runnable {
// 延迟操作的延迟时长
val delayMs: Long
}
abstract class DelayedOperation(override val delayMs: Long) extends TimerTask with Logging {
// 此Operation是否完成
private val completed = new AtomicBoolean(false)
override def run(): Unit = {
if (forceComplete())
// 执行延迟操作到期执行的相关代码
onExpiration()
}
def forceComplete(): Boolean = {
// 如果Operation没有完成
// 这个CAS保证线程安全
if (completed.compareAndSet(false, true)) {
// 从TimerTaskList里删除
cancel()
// 调用真正逻辑
onComplete()
true
} else {
false
}
}
// 具体子类的业务逻辑实现
def onComplete(): Unit
}
private class Watchers(val key: Any) {
// DelayedOperation队列
private[this] val operations = new LinkedList[T]()
// 添加DelayOperation到队列
def watch(t: T) {
operations synchronized operations.add(t)
}
def tryCompleteWatched(): Int = {
var completed = 0
operations synchronized {
// 遍历operations队列
val iter = operations.iterator()
while (iter.hasNext) {
val curr = iter.next()
if (curr.isCompleted) {
// 其他线程完成了这个operation,这里就移除已完成的operation
iter.remove()
} else if (curr synchronized curr.tryComplete()) {
// 尝试执行未完成的operation,如果返回isCompleted=true,表示立刻就能完成就删除
completed += 1
iter.remove()
}
}
}
// operations集合全部完成,从watchersForKey里删除这个键值对
if (operations.size == 0)
removeKeyIfEmpty(key, this)
completed
}
}
class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,
timeoutTimer: Timer, // SystemTimer对象
brokerId: Int = 0, purgeInterval: Int = 1000, reaperEnabled: Boolean = true)
extends Logging with KafkaMetricsGroup {
// 管理watchers
// values是Watchers类型的对象,表示一个DelayedOperation集合,底层是LinkedList
// key是Watchers里DelayedOperation集合关心的对象(貌似关联的key就GroupCoordinator和ReplicaManager
private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key)))
// 对watchersForKey同步的读写锁
private val removeWatchersLock = new ReentrantReadWriteLock()
// delayedOperation的个数
private[this] val estimatedTotalOperations = new AtomicInteger(0)
// 主要作用:推进时间轮指针,定期清理watchersForKey中已完成的DelayedOperation
private val expirationReaper = new ExpiredOperationReaper()
if (reaperEnabled)
// 初始化时就启动expirationReaper线程
expirationReaper.start()
private class ExpiredOperationReaper extends ShutdownableThread(
"ExpirationReaper-%d".format(brokerId), false) {
// 轮询检查推进时间轮指针和清理完成的operation
override def doWork() {
// 时间轮和SystemTimer的指针都是从这个线程驱动推进的
advanceClock(200L)
}
def advanceClock(timeoutMs: Long) {
timeoutTimer.advanceClock(timeoutMs)
// 当DelayedOperationPurgatory与SystemTimer中的DelayOperation数量相差到一个阈值时,执行清理工作
if (estimatedTotalOperations.get - delayed > purgeInterval) {
estimatedTotalOperations.getAndSet(delayed)
val purged = allWatchers.map(_.purgeCompleted()).sum
}
}
}
// 检测指定单个DelayedOperation是否已经完成,若未完成则添加到watchesForKeys和SystemTimer中
def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {
var isCompletedByMe = operation synchronized operation.tryComplete()
if (isCompletedByMe)
return true
// 将DelayedOperation添加到所有key对应的Watchers中
// 一个DelayedOperation可能有多个watchKeys
var watchCreated = false
for(key <- watchKeys) {
if (operation.isCompleted())
// 若过程中被其他线程完成,则放弃后续添加过程
// ExpiredOperationReaper线程会定期清理watchersForKey,所以不需要清理之前添加的key
return false
// 添加DelayedOperation到watchersForKey中对应key的watchers
watchForOperation(key, operation)
if (!watchCreated) {
watchCreated = true
estimatedTotalOperations.incrementAndGet()
}
}
isCompletedByMe = operation synchronized operation.tryComplete()
if (isCompletedByMe)
return true
// 将operation添加到SystemTimer里
// 同时SystemTimer也会把任务添加到时间轮里
if (! operation.isCompleted()) {
timeoutTimer.add(operation)
if (operation.isCompleted()) {
// 如果完成从SystemTimer里删除
operation.cancel()
}
}
false
}
// 根据传入的key,尝试执行对应的Watchers中的operation
def checkAndComplete(key: Any): Int = {
val watchers = inReadLock(removeWatchersLock) { watchersForKey.get(key) }
watchers.tryCompleteWatched()
}
}
TimingWheel.scala
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
推荐阅读更多精彩内容
- 在创建scala项目的时候create Scala SDK: 这里选择bin上一级目录,然后点击OK 这样就出现了...
- 多维数组:数组的元素,还是数组,数组套数组,就是多维数组 构造指定行与列的二维数组:Array.ofDim方法 构...
- http://www.cnblogs.com/cbscan/articles/4147709.html
- Scala 篇 单例对象 在 Java 中实现单例对象通常需要自己实现一个类并创建 getInstance() 的...
- Overview 样本类是 Scala 中使用关键字 case class 声明的类。它可以隐式调用构造方法进行初...