TimingWheel.scala

// tickMs 当前时间轮中一个时间格表示的时间跨度
// wheelSize 当前时间轮的格数
// startMs 时间轮的创建时间
// taskCounter 所有时间轮中任务的总数
// queue 所有时间轮共用的一个任务队列,元素类型是TimerTaskList
private[timer] class TimingWheel(tickMs: Long, wheelSize: Int, startMs: Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) {
  // 当前时间轮的时间跨度
  // 当前时间轮只能处理时间范围在 currentTime~currentTime+interval之间的定时任务,
  // 超过范围要把任务添加到上层时间轮中
  private[this] val interval = tickMs * wheelSize
  // 每个成员对应时间轮里的一个时间格,保存TimerTaskList的数组
  private[this] val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) }
  // currentTime是时间轮的指针,tickMs的整数倍(即只可能按时间格步长前进),将整个时间轮分为到期部分和未到期部分
  // currentTime当前指向的时间格也属于到期部分
  // 初始化时近似等同于创建时间
  private[this] var currentTime = startMs - (startMs % tickMs)
  // 上层时间轮的引用
  @volatile private[this] var overflowWheel: TimingWheel = null

  def add(timerTaskEntry: TimerTaskEntry): Boolean = {
    val expiration = timerTaskEntry.expirationMs

    if (timerTaskEntry.cancelled) {
      false
    } else if (expiration < currentTime + tickMs) {
      // 任务已经过期了,即使是在当前指针指向的时间格也算过期
      false
    } else if (expiration < currentTime + interval) { // 在这个时间轮跨度内.添加到这个时间轮里
      // 根据任务的失效时间分配时间格
      val virtualId = expiration / tickMs
      val bucket = buckets((virtualId % wheelSize.toLong).toInt)
      bucket.add(timerTaskEntry)

      // 随着currentTime后移,当前时间轮能处理的时间段也在不断后移,
      // 新来的TimerTaskEntity会添加到复用原来清理过的时间格
      // 所以每次重置bucket的到期时间,保证最新
      if (bucket.setExpiration(virtualId * tickMs)) {
        queue.offer(bucket)
      }
      true
    } else {
      // 超过时间轮跨度,添加到上层
      if (overflowWheel == null) addOverflowWheel()
      overflowWheel.add(timerTaskEntry)
    }
  }

  // TimingWheel:尝试推进当前和上层时间轮的指针
  def advanceClock(timeMs: Long): Unit = {
    if (timeMs >= currentTime + tickMs) {
      // 修整currentTime是tickMs的整数倍, 即减去整除后多余的余数
      // 指针的前进并不是想象中的固定步长,而是直接跳到对应任务的超时时间
      currentTime = timeMs - (timeMs % tickMs)

      if (overflowWheel != null) overflowWheel.advanceClock(currentTime)
    }
  }
}

private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed {
  private[this] val expiration = new AtomicLong(-1L)

  def setExpiration(expirationMs: Long): Boolean = {
    // 这里判断新添加任务的expiration和原来的是否一致,保证幂等
    expiration.getAndSet(expirationMs) != expirationMs
  }

}

// 执行到期任务、阻塞等待最近到期任务
@threadsafe
class SystemTimer(executorName: String, tickMs: Long = 1, wheelSize: Int = 20,
                  startMs: Long = System.currentTimeMillis) extends Timer {
  // 固定线程池,执行到期任务
  private[this] val taskExecutor = Executors.newFixedThreadPool(1, new ThreadFactory() {
    def newThread(runnable: Runnable): Thread =
      Utils.newThread("executor-"+executorName, runnable, false)
  })

  // 所有时间轮共用队列
  private[this] val delayQueue = new DelayQueue[TimerTaskList]()
  private[this] val taskCounter = new AtomicInteger(0)
  // 最底层的时间轮
  private[this] val timingWheel = new TimingWheel(
    tickMs = tickMs, wheelSize = wheelSize, startMs = startMs, taskCounter = taskCounter, delayQueue
  )
  // 同步时间轮currentTime修改的读写锁
  private[this] val readWriteLock = new ReentrantReadWriteLock()

  // DelayedOperationPurgatory.tryCompleteElseWatch里如果未到时间的operation会触发add
  // DelayedOperationPurgatory是TimerTask的子类
  // 实质就是加锁版的addTimerTaskEntry
  def add(timerTask: TimerTask): Unit = {
    readLock.lock()
    try {
      addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + System.currentTimeMillis()))
    } finally {
      readLock.unlock()
    }
  }

  // 添加定时任务,未过期就加入时间轮,否则就执行
  private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
    if (!timingWheel.add(timerTaskEntry)) {
      // 时间轮添加任务返回false说明已经过期,直接执行该任务
      if (!timerTaskEntry.cancelled)
        taskExecutor.submit(timerTaskEntry.timerTask)
    }
  }

  // SystenTimer.advanClock
  def advanceClock(timeoutMs: Long): Boolean = {
    // 取出的是TimerTaskList类型成员
    // 当TimerTaskList因为超时被轮询出来并不一定代表里面所有的TimerTaskEntry一定就超时,
    // 所以对于没有超时的TimerTaskEntry需要重新加入到TimingWheel新的TimerTaskList中,对于超时的TimerTaskEntry则立即执行任务。
    var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
    if (bucket != null) {
      // 定时队列能取出任务说明任务已到期
      writeLock.lock()
      try {
        while (bucket != null) {
          // 从最底层的时间轮开始推进指针
          timingWheel.advanceClock(bucket.getExpiration())
          // 从队列里取出的是TimerTaskList,然后遍历List,每条Entity过期就执行,未过期就重新从底层时间轮开始插入
          // 不就重复插入了吗?在哪里清空时间格的??
          bucket.flush(reinsert)
          // 此处poll不会阻塞
          bucket = delayQueue.poll()
        }
      } finally {
        writeLock.unlock()
      }
      true
    } else {
      false
    }
  }
}

trait TimerTask extends Runnable {
  // 延迟操作的延迟时长
  val delayMs: Long
}

abstract class DelayedOperation(override val delayMs: Long) extends TimerTask with Logging {
  // 此Operation是否完成
  private val completed = new AtomicBoolean(false)

  override def run(): Unit = {
    if (forceComplete())
      // 执行延迟操作到期执行的相关代码
      onExpiration()
  }

  def forceComplete(): Boolean = {
    // 如果Operation没有完成
    // 这个CAS保证线程安全
    if (completed.compareAndSet(false, true)) {
      // 从TimerTaskList里删除
      cancel()
      // 调用真正逻辑
      onComplete()
      true
    } else {
      false
    }
  }

  // 具体子类的业务逻辑实现
  def onComplete(): Unit
}

private class Watchers(val key: Any) {

  // DelayedOperation队列
  private[this] val operations = new LinkedList[T]()

  // 添加DelayOperation到队列
  def watch(t: T) {
    operations synchronized operations.add(t)
  }

  def tryCompleteWatched(): Int = {

    var completed = 0
    operations synchronized {
      // 遍历operations队列
      val iter = operations.iterator()
      while (iter.hasNext) {
        val curr = iter.next()
        if (curr.isCompleted) {
          // 其他线程完成了这个operation,这里就移除已完成的operation
          iter.remove()
        } else if (curr synchronized curr.tryComplete()) {
          // 尝试执行未完成的operation,如果返回isCompleted=true,表示立刻就能完成就删除
          completed += 1
          iter.remove()
        }
      }
    }

    // operations集合全部完成,从watchersForKey里删除这个键值对
    if (operations.size == 0)
      removeKeyIfEmpty(key, this)

    completed
  }

}

class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String,
         timeoutTimer: Timer, // SystemTimer对象
         brokerId: Int = 0, purgeInterval: Int = 1000, reaperEnabled: Boolean = true)
        extends Logging with KafkaMetricsGroup {

  // 管理watchers
  // values是Watchers类型的对象,表示一个DelayedOperation集合,底层是LinkedList
  // key是Watchers里DelayedOperation集合关心的对象(貌似关联的key就GroupCoordinator和ReplicaManager
  private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers(key)))
  // 对watchersForKey同步的读写锁
  private val removeWatchersLock = new ReentrantReadWriteLock()
  // delayedOperation的个数
  private[this] val estimatedTotalOperations = new AtomicInteger(0)

  // 主要作用:推进时间轮指针,定期清理watchersForKey中已完成的DelayedOperation
  private val expirationReaper = new ExpiredOperationReaper()

  if (reaperEnabled)
    // 初始化时就启动expirationReaper线程
    expirationReaper.start()

  private class ExpiredOperationReaper extends ShutdownableThread(
    "ExpirationReaper-%d".format(brokerId), false) {
    // 轮询检查推进时间轮指针和清理完成的operation
    override def doWork() {
      // 时间轮和SystemTimer的指针都是从这个线程驱动推进的
      advanceClock(200L)
    }

    def advanceClock(timeoutMs: Long) {
      timeoutTimer.advanceClock(timeoutMs)
      // 当DelayedOperationPurgatory与SystemTimer中的DelayOperation数量相差到一个阈值时,执行清理工作
      if (estimatedTotalOperations.get - delayed > purgeInterval) {
        estimatedTotalOperations.getAndSet(delayed)
        val purged = allWatchers.map(_.purgeCompleted()).sum
      }
    }
  }

  // 检测指定单个DelayedOperation是否已经完成,若未完成则添加到watchesForKeys和SystemTimer中
  def tryCompleteElseWatch(operation: T, watchKeys: Seq[Any]): Boolean = {
    var isCompletedByMe = operation synchronized operation.tryComplete()
    if (isCompletedByMe)
      return true

    // 将DelayedOperation添加到所有key对应的Watchers中
    // 一个DelayedOperation可能有多个watchKeys
    var watchCreated = false
    for(key <- watchKeys) {
      if (operation.isCompleted())
        // 若过程中被其他线程完成,则放弃后续添加过程
        // ExpiredOperationReaper线程会定期清理watchersForKey,所以不需要清理之前添加的key
        return false
      // 添加DelayedOperation到watchersForKey中对应key的watchers
      watchForOperation(key, operation)

      if (!watchCreated) {
        watchCreated = true
        estimatedTotalOperations.incrementAndGet()
      }
    }

    isCompletedByMe = operation synchronized operation.tryComplete()
    if (isCompletedByMe)
      return true

    // 将operation添加到SystemTimer里
    // 同时SystemTimer也会把任务添加到时间轮里
    if (! operation.isCompleted()) {
      timeoutTimer.add(operation)
      if (operation.isCompleted()) {
        // 如果完成从SystemTimer里删除
        operation.cancel()
      }
    }

    false
  }

  // 根据传入的key,尝试执行对应的Watchers中的operation
  def checkAndComplete(key: Any): Int = {
    val watchers = inReadLock(removeWatchersLock) { watchersForKey.get(key) }
      watchers.tryCompleteWatched()
  }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,793评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,567评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,342评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,825评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,814评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,680评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,033评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,687评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,175评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,668评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,775评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,419评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,020评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,206评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,092评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,510评论 2 343

推荐阅读更多精彩内容