本文介绍如何在一个静态(可能是异构的)集群上分布式的运行Flink。
要求
软件要求
Flink运行在所有的类UNIX的环境上,例如Linux,Mac OS X,和 Cygwin(对于Windows),并且希望集群由一个master节点和一个或多个worker节点组成。在设置系统之前,请确认在每个节点上都安装了下述软件:
- Java 1.8.x或更高,
- ssh (必须运行sshd来使用管理远程组件的Flink脚本)
如果你的集群不满足这些软件要求,你必须安装/升级它。
在集群节点上使用免密SSH和相同的目录结构将允许你使用我们的脚本控制一切。
JAVA_HOME
配置
Flink要求JAVA_HOME
环境变量被设置在master和所有的worker节点上,并指向Java的安装目录。
你可以通过conf/flink-conf.yaml
中的env.java.home
变量设置它的值。
Flink设置
请去下载页面下载安装包。确认选择的Flink包匹配你的Hadoop版本。如果你不计划使用Hadoop,任意版本都行。
下载最新版本后,将包复制到您的主节点并解压它:
tar xzf flink-*.tgz
cd flink-*
配置Flink
在提取了系统文件后,你必须通过编辑conf/flink-conf.yaml文件来为集群配置Flink。
设置 jobmanager.rpc.address
来指向你的master节点。你还应该通过设置jobmanager.heap.mb
和taskmanager.heap.mb
来定义每个节点上允许分配给JVM的最大主内存。
这些值的单位都是MB。如果一些worker节点有更多的内存来分配给Flink系统,你可以通过再指定节点上设置环境变量FLINK_TM_HEAP
来覆盖默认值。
最后,你必须提供一个集群中的节点列表作为worker节点。因此,类似于HDFS的配置,编辑 conf/slaves文件,输入每个worker节点的IP/主机名。每个worker节点稍后会运行一个TaskManager。
下面的例子展示了三个节点的设置(IP地址从 10.0.0.1 到 10.0.0.3,主机名分别为 master, worker1, worker2),并配置文件的内容(需要在所有机器上相同的可访问路径上)。
Flink目录必须在每个节点的同一目录下可用。你可以使用共享的NFS目录,或者拷贝整个Flink目录到每个worker节点上。
更详细的和其它配置选项请阅读配置说明。
特别是,
- 每个JobManager的可用内存(
jobmanager.heap.mb
) - 每个TaskManager的可用内存(
taskmanager.heap.mb
) - 每台机器的可用CPU数(
taskmanager.numberOfTaskSlots
) - 集群的总的CPU数(
parallelism.default
)和 - 临时目录(
taskmanager.tmp.dirs
)
都是非常重要的配置值。
启动Flink
下面的脚本在本地节点上启动一个JobManager,并通过SSH连接到slaves文件中列出的所有worker节点来在每个节点上启动TaskManager。现在你的Flink系统已经起来并运行了。运行在本地节点上的JobManager现在在配置的RPC端口上接收作业。
假设你再master节点上,并且在Flink目录内:
bin/start-cluster.sh
对于停止Flink,这里也有一个stop-cluster.sh
脚本。
添加 JobManager/TaskManager 实例到集群中
你可以使用bin/jobmanager.sh
和 bin/taskmanager.sh
脚本添加 JobManager 和 TaskManager实例到运行中的集群中。
添加一个JobManager
bin/jobmanager.sh ((start|start-foreground) cluster)|stop|stop-all
添加一个TaskManager
bin/taskmanager.sh start|start-foreground|stop|stop-all
确保在您希望启动/停止相应实例的主机上调用这些脚本。