Spark ShutdownHook

按照优先级在关闭时执行一系列操作,在spark内用途很广泛,主要是释放资源,删除文件等

使用

    // SparkContext在初始化时注册,设定优先级和要调用的函数
    _shutdownHookRef = ShutdownHookManager.addShutdownHook(
      ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
      logInfo("Invoking stop() from shutdown hook")
      try {
        stop() // 关闭SparkContext时要释放清理的对象
      } catch {
        case e: Throwable =>
          logWarning("Ignoring Exception while stopping SparkContext from shutdown hook", e)
      }
    }

ShutdownHookManager

ShutdownHookManager的静态函数addShutdownHook依赖SparkShutdownHookManager类进行具体的逻辑处理

  //处理的核心
  private lazy val shutdownHooks = {
    val manager = new SparkShutdownHookManager()
    manager.install()
    manager
  }

  // 如果在jvm执行shutdown hook过程中添加钩子,jvm会抛出异常
  def inShutdown(): Boolean = {
    try {
      val hook = new Thread {
        override def run() {}
      }
      // scalastyle:off runtimeaddshutdownhook
      Runtime.getRuntime.addShutdownHook(hook)
      // scalastyle:on runtimeaddshutdownhook
      Runtime.getRuntime.removeShutdownHook(hook)
    } catch {
      case ise: IllegalStateException => return true
    }
    false
  }
  
  //添加钩子
  def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = {
    shutdownHooks.add(priority, hook)
  }
private [util] class SparkShutdownHookManager {
  // 每个钩子都记录到优先队列里
  private val hooks = new PriorityQueue[SparkShutdownHook]()
  @volatile private var shuttingDown = false

  /**
   * Install a hook to run at shutdown and run all registered hooks in order.
   */
  def install(): Unit = {
    val hookTask = new Runnable() {
      // runAll 函数会从优先队列中取出所有的钩子并运行
      override def run(): Unit = runAll()
    }
    // 依赖Hadoop的ShutdownHookManager机制
    org.apache.hadoop.util.ShutdownHookManager.get().addShutdownHook(
      hookTask, FileSystem.SHUTDOWN_HOOK_PRIORITY + 30)
  }
}

Hadoop的ShutdownHookManager机制是通过JDK的addShutdownHook实现,收到信号后,将所有的钩子按照优先级取出执行

JDK ShutdownHook

java.lang.Runtime包内的函数:

addShutdownHook(Thread hook)

注册一个JVM关闭的钩子,JVM会为了响应以下两类事件而关闭:

  • 程序正常退出exits,当最后的非守护线程退出时,或者在调用System.exit方法时
  • 因为用户中断,终止terminated虚拟机,例如Ctrl + C,或发生系统级别的事件,比如用户退出(产生SIGHUP,系统的默认处理方式是终止进程)或系统关闭

shutdown hook是一个已初始化但尚未启动的线程。虚拟机开始关闭时,它会以某种顺序启动所有已经注册的关闭钩子,并让它们同时运行。运行完所有的钩子后,虚拟机就会停止。注意,关闭期间会继续运行守护线程,如果通过调用exit方法来发起关闭,那么也会继续运行非守护线程

一旦开始了关闭序列shutdown sequence,则只能通过调用halt方法来停止,此方法可强行终止虚拟机,在关闭过程中不能注册新的关闭钩子或取消注册先前已注册的钩子。否则会导致抛出 IllegalStateException

shutdown hook在虚拟机生命周期中的特定时间运行,因此在编写代码时要特别注意,保证线程安全,并尽可能地避免死锁

shutdown hook应该快速地完成其工作。当程序调用exit时,虚拟机应该迅速地关闭并退出。由于用户退出或系统关闭而终止虚拟机时,底层的操作系统可能只允许在固定的时间内关闭并退出。因此在关闭钩子中不应该进行用户交互或执行长时间的计算

与其他所有线程一样,通过调用线程所属线程组的ThreadGroup.uncaughtException方法,可在关闭钩子中处理未捕获的异常。此方法的默认实现是将该异常的堆栈跟踪打印至System.err并终止线程;它不会导致虚拟机退出或暂停。

仅在很少的情况下,虚拟机可能会中止abort,也就是没有完全关闭就停止运行。虚拟机被外部终止时会出现这种现象,比如在Unix上使用SIGKILL信号。如果native方法出错,虚拟机也会终止,例如内部数据结构损坏或试图访问不存在的内存。如果虚拟机中止,则无法保证是否将运行关闭钩子

File.deleteOnExit()进程结束后删除文件,其实现机制就是ShutdownHook

JDK信号处理相关

主要是sun.misc包内两个类:SignalSignalHandler

Signal构造函数

// 参数为对应信号的名称
public Signal(String var1)

通过kill -l可以查看信号的名称

SignalHandler接口的核心是处理函数handle(Signal var1)

public interface SignalHandler {
    SignalHandler SIG_DFL = new NativeSignalHandler(0L);
    SignalHandler SIG_IGN = new NativeSignalHandler(1L);

    void handle(Signal var1);
}

静态方法Signal.handle(Signal, SignalHandler)注册Signal对应的处理器

org.apache.spark.util.SignalUtils类中就添加了对"TERM", "HUP", "INT"三个信号的处理

  def registerLogger(log: Logger): Unit = synchronized {
    if (!loggerRegistered) {
      Seq("TERM", "HUP", "INT").foreach { sig =>
        SignalUtils.register(sig) {
          log.error("RECEIVED SIGNAL " + sig)
          false
        }
      }
      loggerRegistered = true
    }
  }

内部创建对应的信号Signal,以及对应的ActionHandler,其实现了SignalHandler接口的handle方法,内部会执行注册的方法,这里就是进行log输出

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,214评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,307评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,543评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,221评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,224评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,007评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,313评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,956评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,441评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,925评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,018评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,685评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,234评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,240评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,464评论 1 261
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,467评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,762评论 2 345

推荐阅读更多精彩内容

  • 进程和线程 进程 所有运行中的任务通常对应一个进程,当一个程序进入内存运行时,即变成一个进程.进程是处于运行过程中...
    胜浩_ae28阅读 5,085评论 0 23
  • 一.线程安全性 线程安全是建立在对于对象状态访问操作进行管理,特别是对共享的与可变的状态的访问 解释下上面的话: ...
    黄大大吃不胖阅读 828评论 0 3
  • 任务和线程的启动很容易。 在大多数时候, 我们都会让它们运行直到结束,或者让它们自行停止。然而,有时候我们希望提前...
    好好学习Sun阅读 1,138评论 0 0
  • 爱六月啊阅读 263评论 1 1
  • 一、 我要考韩语! 我要过六级! 我要拿奖学金! 我要....... 最近看了看15年的目标,这些flag真是立起...
    不晓晓阅读 833评论 0 1