Flink JobManager HA高可用

Flink JobManager HA高可用


概述

本文主要讲解下Flink standalone下JobManager的HA高可用和Flink on yarn下JobManager的HA高可用。

JobManager 高可用(HA)

  • jobManager协调每个flink任务部署。它负责任务调度和资源管理。

  • 默认情况下,每个flink集群只有一个JobManager,这将导致一个单点故障(SPOF):如果JobManager挂了,则不能提交新的任务,并且运行中的程序也会失败。

  • 使用JobManager HA,集群可以从JobManager故障中恢复,从而避免SPOF(单点故障) 。 用户可以在standalone或 YARN集群 模式下,配置集群高可用

JobManager HA配置步骤

  • Standalone集群的高可用

    Standalone模式(独立模式)下JobManager的高可用性的基本思想是,任何时候都有一个 Master JobManager ,并且多个Standby JobManagers 。 Standby JobManagers可以在Master JobManager 挂掉的情况下接管集群成为Master JobManager。 这样保证了没有单点故障,一旦某一个Standby JobManager接管集群,程序就可以继续运行。 Standby JobManager和Master JobManager实例之间没有明确区别。 每个JobManager都可以成为Master或Standby节点

  • Yarn 集群高可用

    flink on yarn的HA 其实主要是利用yarn自己的job恢复机制

Flink Standalone集群HA配置

1.HA集群环境规划

使用两台节点实现两主两从集群
注意:
要启用JobManager高可用性,必须将高可用性模式设置为zookeeper,配置一个ZooKeeper
quorum,并配置一个masters文件存储所有JobManager hostname及其Web UI端口号。
Flink利用ZooKeeper实现运行中的JobManager节点之间的分布式协调。ZooKeeper是独立
于Flink的服务,它通过领导选举制和轻量级状态一致性存储来提供高度可靠的分布式协调。

2. 开始配置+启动

集群内所有节点的配置都一样,所以先从第一台机器50.63,50.64开始配置
ssh data-hadoop-50-63.xxx.com

Stanalone的配置
#首先按照之前配置 standalone 的参数进行修改
vi conf/flink-conf.yaml
jobmanager.rpc.address: data-hadoop-50-63.xxx.com
vi conf/slaves
data-hadoop-50-63.xxx.com
data-hadoop-50-64.xxx.com
HA的配置
[iknow@data-hadoop-50-63 flink-1.7.2]$ cat conf/slaves
#localhost
data-hadoop-50-63.xxx.com
data-hadoop-50-64.xxx.com
#jobmanager.rpc.address: localhost
jobmanager.rpc.address: data-hadoop-50-63.xxx.com
#jobmanager.rpc.address: 192.168.50.63

# high-availability: zookeeper
high-availability: zookeeper
#ZooKeeper 节点根目录,其下放置所有集群节点的 namespace
high-availability.zookeeper.path.root: /flink-standalone-ha

# high-availability.storageDir: hdfs:///flink/ha/
#ZooKeeper节点集群id,其中放置了集群所需的所有协调数据
high-availability.cluster-id: /cluster_one
#建议指定 hdfs 的全路径。如果某个 flink 节点没有配置 hdfs 的话,不指定全路径无法识别
# storageDir 存储了恢复一个 JobManager 所需的所有元数据。
high-availability.storageDir: hdfs://data-hadoop-50-63.xxx.com:9000/flink/flink-standalone-ha

# high-availability.zookeeper.quorum: localhost:2181
high-availability.zookeeper.quorum: data-hadoop-50-63.xxx.com:2181

#rest.port: 8081
rest.port: 18081
启动服务

先启动 hadoop 服务

sbin/start-all.sh

先启动 zk 服务

bin/zkServer.sh start

启动flink standaloneHA集群,在50.63节点上启动如下命令

bin/start-cluster.sh

去zk里查看
./bin/zkCli.sh


启动HA集群

[iknow@data-hadoop-50-63 flink-1.7.2]$ ./bin/start-cluster.sh
Starting HA cluster with 2 masters.
Starting standalonesession daemon on host data-hadoop-50-63.bjrs.zybang.com.
Starting standalonesession daemon on host data-hadoop-50-64.bjrs.zybang.com.
Starting taskexecutor daemon on host data-hadoop-50-63.bjrs.zybang.com.
Starting taskexecutor daemon on host data-hadoop-50-64.bjrs.zybang.com.
3. 验证 HA 集群进程

50.63启动的进程



50.64上启动的进程


Flink web ui未启动在18081端口上,


通过查看50.63日志发现flink web ui启动在45141端口,

2019-02-25 10:48:48,787 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Rest endpoint listening at data-hadoop-50-63.bjrs.com:45141
2019-02-25 10:48:48,813 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Web frontend listening at http://data-hadoop-50-63.bjrs.zybang.com:45141.
2019-02-25 10:48:48,851 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - http://data-hadoop-50-63.bjrs.zybang.com:45141 was granted leadership with leaderSessionID=3a3047aa-bdb8-4234-9e1a-d3900081f169
2019-02-25 10:48:48,878 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at akka://flink/user/resourcemanager .
2019-02-25 10:48:48,911 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/dispatcher

jobManager启动在50.63上


50.64日志:


2019-02-25 10:48:46,421 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Rest endpoint listening at data-hadoop-50-64.bjrs.zybang.com:41511
2019-02-25 10:48:46,421 INFO  org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService  - Starting ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/rest_server_lock'}.
2019-02-25 10:48:46,444 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Web frontend listening at http://data-hadoop-50-64.bjrs.zybang.com:41511.
2019-02-25 10:48:46,502 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at akka://flink/user/resourcemanager .
2019-02-25 10:48:46,527 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/dispatcher .
4. 模拟jobmanager进程挂掉

现在50.63节点上的jobmanager是active的。我们手工把这个进程kill掉,模拟进程
挂掉的情况,来验证50.64上的standby状态的jobmanager是否可以正常切换到active。

[iknow@data-hadoop-50-63 flink-1.7.2]$ jps
428021 DataNode
431794 TaskManagerRunner
428320 SecondaryNameNode
429073 NodeManager
76966 Worker
437470 Jps
427770 NameNode
431177 StandaloneSessionClusterEntrypoint
429833 QuorumPeerMain
[iknow@data-hadoop-50-63 flink-1.7.2]$ kill -9 431177
[iknow@data-hadoop-50-63 flink-1.7.2]$ jps
428021 DataNode
431794 TaskManagerRunner
428320 SecondaryNameNode
429073 NodeManager
76966 Worker
427770 NameNode
437867 Jps
429833 QuorumPeerMain
5.验证HA切换

Kill掉50.63上的jobmanager再次访问50.63上的JobManager是访问不了的;
50.63节点上的jobmanager进程被手工kill掉了,然后50.64上的jobmanager会
自动切换为active,中间需要有一个时间差,稍微等一下。
Kill掉50.63上的jobManager之后,访问50.64

6.重启之前kill掉的jobmanager

在50.64上重启之前被kill的jobmanager

[iknow@data-hadoop-50-63 flink-1.7.2]$ jps
428021 DataNode
431794 TaskManagerRunner
428320 SecondaryNameNode
76966 Worker
427770 NameNode
445947 Jps
429833 QuorumPeerMain
[iknow@data-hadoop-50-63 flink-1.7.2]$ ./bin/jobmanager.sh start
Starting standalonesession daemon on host data-hadoop-50-63.bjrs.zybang.com.
[iknow@data-hadoop-50-63 flink-1.7.2]$ jps
446533 StandaloneSessionClusterEntrypoint
428021 DataNode
431794 TaskManagerRunner
428320 SecondaryNameNode
76966 Worker
427770 NameNode
446667 Jps
429833 QuorumPeerMain

这个时候active的jobManager还是50.64


查看hdfs,看到是有/flink/flink-standalone-ha/cluster_one 目录生成的(blob是什么目录,后面在研究下,从flink jobManager的日志看是有生成blob目录的)


2019-02-25 10:48:47,747 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils         - Actor system started at akka.tcp://flink@data-hadoop-50-63.bjrs.zybang.com:36169
2019-02-25 10:48:48,042 INFO  org.apache.flink.runtime.blob.FileSystemBlobStore             - Creating highly available BLOB storage directory at hdfs://data-hadoop-50-63.bjrs.zybang.com:9000/flink/flink-standalone-ha//cluster_one/blob

Flink on yarn集群HA

1. HA集群环境规划

flink on yarn的HA其实是利用yarn自己的恢复机制。
在这需要用到zk,主要是因为虽然flink-on-yarn cluster HA依赖于Yarn自己的集群机制,但是Flink Job在恢复时,需要依赖检查点产生的快照,而这些快照虽然配置在hdfs,但是其元数据信息保存在zookeeper 中,所以我们还要配置 zookeeper 的信息。

首先需要修改hadoop中 yarn-site.xml 中的配置,设置提交应用程序的最大尝试次数

<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>8</value>
<description>
The maximum number of application master execution attempts.
</description>

配置Yarn重试次数

vi conf/flink-conf.yaml
yarn.application-attempts: 8

此参数代表Flink Job(yarn中称为application)在Jobmanager(或者叫Application Master)恢复时,允许重启的最大次数。
注意,Flink On Yarn环境中,当Jobmanager(Application Master)失败时,yarn会尝试重启JobManager(AM),重启后,会重新启动Flink的Job(application)。因此,yarn.application-attempts的设置不应该超过yarn.resourcemanager.am.max-attemps.

3. 启动flink on yarn,测试HA

在50.63上启动zk和hadoop

bin/zkServer.sh start
sbin/start-all.sh
2019-02-25 11:48:20,914 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Initiating client connection, connectString=data-hadoop-50-63.bjrs.zybang.com:2181 sessionTimeout=60000 watcher=org.apache.flink.shaded.curator.org.apache.curator.ConnectionState@182f1e9a
2019-02-25 11:48:20,926 WARN  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/tmp/jaas-7156394372293067519.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it.
2019-02-25 11:48:20,929 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Opening socket connection to server data-hadoop-50-63.bjrs.zybang.com/192.168.50.63:2181
2019-02-25 11:48:20,930 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Socket connection established to data-hadoop-50-63.bjrs.zybang.com/192.168.50.63:2181, initiating session
2019-02-25 11:48:20,930 ERROR org.apache.flink.shaded.curator.org.apache.curator.ConnectionState  - Authentication failed
2019-02-25 11:48:20,938 INFO  org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Session establishment complete on server data-hadoop-50-63.bjrs.zybang.com/192.168.50.63:2181, sessionid = 0x169228cb2e10006, negotiated timeout = 40000
2019-02-25 11:48:20,940 INFO  org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager  - State change: CONNECTED
2019-02-25 11:48:21,093 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest client endpoint started.
Flink JobManager is now running on data-hadoop-50-63.bjrs.zybang.com:38695 with leader id 45d62639-aac4-4903-ae50-a4d69f987e4a.
JobManager Web Interface: http://data-hadoop-50-63.bjrs.zybang.com:34621
2019-02-25 11:48:21,155 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - The Flink YARN client has been started in detached mode. In order to stop Flink on YARN, use the following command or a YARN web interface to stop it:
yarn application -kill application_1551066475117_0001

Hdfs上有HA对应的目录生成


YARN_CONF_DIR(如果没设置则在HADOOP_CONF_DIR)下看看日志情况,当前AM的日志路径在$HADOOP_CONF_DIR/userlogs//下,可以看出Yarn在重启YarnApplicationMasterRunner进程,并在重启期后重新提交Flink的Job。
Hadoop下是有logs/userlogs/目录的

jobmanager 进程就在对应的节点的(YarnSessionClusterEntrypoint)进程里面
所以想要测试 jobmanager 的 HA 情况,只需要拿 YarnSessionClusterEntrypoint 这个进程进行测试即可。
执行下面命令手工模拟 kill 掉 jobmanager(YarnSessionClusterEntrypoint)
Kill 掉jobManager进程

[iknow@data-hadoop-50-63 flink-1.7.2]$ jps
428021 DataNode
428320 SecondaryNameNode
76966 Worker
458095 NodeManager
729 YarnSessionClusterEntrypoint
427770 NameNode
457803 ResourceManager
1581 Jps
457275 FlinkYarnSessionCli
429833 QuorumPeerMain
[iknow@data-hadoop-50-63 flink-1.7.2]$ kill -9 729
[iknow@data-hadoop-50-63 flink-1.7.2]$ jps
428021 DataNode
3540 Jps
428320 SecondaryNameNode
76966 Worker
458095 NodeManager
427770 NameNode
457803 ResourceManager
457275 FlinkYarnSessionCli
3438 YarnSessionClusterEntrypoint
429833 QuorumPeerMain

如图看到Attempt ID由000001变为000002、000003…,示进程也有所变化,,说明HA切换成功了。
进入zk查看zk里的目录,flink-yarn-ha是有生成的。


./bin/zkCli.sh 命令进去zk,quit退出zk。

HA配置,这里只列出High Availability的配置,其他的配置未列出

#==============================================================================
# High Availability
#==============================================================================

# The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
#
# high-availability: zookeeper
high-availability: zookeeper
high-availability.zookeeper.path.root: /flink-yarn-ha
# The path where metadata for master recovery is persisted. While ZooKeeper stores
# the small ground truth for checkpoint and leader election, this location stores
# the larger objects, like persisted dataflow graphs.
#
# Must be a durable file system that is accessible from all nodes
# (like HDFS, S3, Ceph, nfs, ...)
#
# high-availability.storageDir: hdfs:///flink/ha/
#high-availability.cluster-id: /cluster_one
high-availability.storageDir: hdfs://data-hadoop-50-63.xxx.com:9000/flink/flink-yarn-ha

# The list of ZooKeeper quorum peers that coordinate the high-availability
# setup. This must be a list of the form:
# "host1:clientPort,host2:clientPort,..." (default client
#
# high-availability.zookeeper.quorum: localhost:2181
high-availability.zookeeper.quorum: data-hadoop-50-63.xxx.com:2181
yarn.application-attempts: 8
high-availability.jobmanager.port: 18085

# ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
# It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE)
# The default value is "open" and it can be changed to "creator" if ZK security is enabled
#
# high-availability.zookeeper.client.acl: open
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,126评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,254评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,445评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,185评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,178评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,970评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,276评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,927评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,400评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,883评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,997评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,646评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,213评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,204评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,423评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,423评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,722评论 2 345

推荐阅读更多精彩内容

  • 概述 JobManager 协调每个 Flink 部署。它负责调度和资源管理。 默认情况下,每个 Flink 集群...
    尼小摩阅读 9,928评论 0 2
  • Flink on yarn部署模式 背景 Flink是一个高性能,高吞吐,低延迟的流处理框架。它不仅仅是作为一个流...
    it_zzy阅读 72,935评论 5 31
  • apache Flink是一个面向分布式数据流处理和批量数据处理的开源计算平台,它能够基于同一个Flink运行时(...
    生活的探路者阅读 1,475评论 3 8
  • 1.下载Flink压缩包 下载地址:http://flink.apache.org/downloads.html。...
    尼小摩阅读 62,935评论 2 22
  • 01 你有没有过这样的时刻:因为别人觉得你不够外向,你拼了命表现活泼开朗的样子?父母耳提面命地要你学习别人家的孩子...
    郑喜月阅读 537评论 3 9