Flink on yarn运行python程序
Flink本地local模式运行python程序
参考官网写了python程序Adder.py(后面贴上代码),在本地local模式用./bin/pyflink.sh ./Adder.p启动是可以的
➜ flink-1.7.1 ./bin/pyflink.sh ./Adder.py
Starting execution of program
Program execution finished
Job with JobID 8c7ab33a382c279b66089d43693fde52 has finished.
Job Runtime: 679 ms
jobManager web ui上查看是有JobID 为8c7ab33a382c279b66089d43693fde52的任务的。
可以看到本地local模式是可以运行python程序的。
Flink on yarn运行python程序
在服务器6.34上执行(6.34上有单机的flink客户端,可以连接yarn,向yarn提交flink程序)
iknow@search-uda-6-34:~/wangbin/flink-1.7.2 $ ./bin/pyflink.sh ./Adder.py
默认会起动到application_1550579025929_61820上,指定了yid参数也会默认跑在application_1550579025929_61820上
决定换一台机器试试,到0-107上,这里省略拷贝hadoop和flink环境的过程,
启动一个yarn session:
./bin/yarn-session.sh -qu core -nm yarn-session-core -n 2 -jm 1024m -tm 2096m
启动日志如下:
iknow@search-uda-0-107:~/wangbin/flink/flink-1.7.2 $ ./bin/yarn-session.sh -qu core -nm yarn-session-core -n 2 -jm 1024m -tm 2096m
2019-03-01 19:53:31,148 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.address, localhost
2019-03-01 19:53:31,150 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.rpc.port, 6123
2019-03-01 19:53:31,151 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: jobmanager.heap.size, 1024m
2019-03-01 19:53:31,151 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.heap.size, 1024m
2019-03-01 19:53:31,151 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2019-03-01 19:53:31,152 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: parallelism.default, 1
2019-03-01 19:53:31,153 INFO org.apache.flink.configuration.GlobalConfiguration - Loading configuration property: rest.port, 8081
2019-03-01 19:53:31,815 WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2019-03-01 19:53:31,978 WARN org.apache.hadoop.conf.Configuration - /home/iknow/hadoop-client/etc/hadoop/core-site.xml:an attempt to override final parameter: fs.defaultFS; Ignoring.
2019-03-01 19:53:31,980 WARN org.apache.hadoop.conf.Configuration - /home/iknow/hadoop-client/etc/hadoop/hdfs-site.xml:an attempt to override final parameter: dfs.datanode.data.dir; Ignoring.
2019-03-01 19:53:31,981 WARN org.apache.hadoop.conf.Configuration - /home/iknow/hadoop-client/etc/hadoop/hdfs-site.xml:an attempt to override final parameter: dfs.namenode.name.dir; Ignoring.
2019-03-01 19:53:31,986 INFO org.apache.flink.runtime.security.modules.HadoopModule - Hadoop user set to iknow (auth:SIMPLE)
2019-03-01 19:53:32,451 INFO org.apache.hadoop.yarn.client.AHSProxy - Connecting to Application History server at data-hadoop-112-16.bjrs.zybang.com/192.168.112.16:10200
2019-03-01 19:53:32,479 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli - The argument n is deprecated in will be ignored.
2019-03-01 19:53:32,542 INFO org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider - Looking for the active RM in [rm1, rm2]...
2019-03-01 19:53:32,662 INFO org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider - Found active RM [rm1]
2019-03-01 19:53:32,720 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - The JobManager or TaskManager memory is below the smallest possible YARN Container size. The value of 'yarn.scheduler.minimum-allocation-mb' is '2048'. Please increase the memory size.YARN will allocate the smaller containers but the scheduler will account for the minimum-allocation-mb, maybe not all instances you requested will start.
2019-03-01 19:53:32,720 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=2048, taskManagerMemoryMB=2096, numberTaskManagers=2, slotsPerTaskManager=1}
2019-03-01 19:53:33,265 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2019-03-01 19:53:33,274 WARN org.apache.flink.yarn.AbstractYarnClusterDescriptor - The configuration directory ('/home/iknow/wangbin/flink/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
2019-03-01 19:53:42,592 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Submitting application master application_1550579025929_89782
2019-03-01 19:53:42,849 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1550579025929_89782
2019-03-01 19:53:42,849 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Waiting for the cluster to be allocated
2019-03-01 19:53:42,853 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - Deploying cluster, current state ACCEPTED
2019-03-01 19:53:50,718 INFO org.apache.flink.yarn.AbstractYarnClusterDescriptor - YARN application has been deployed successfully.
2019-03-01 19:53:51,320 INFO org.apache.flink.runtime.rest.RestClient - Rest client endpoint started.
Flink JobManager is now running on search-as-107-45.bjcq.zybang.com:42915 with leader id 00000000-0000-0000-0000-000000000000.
JobManager Web Interface: http://search-as-107-45.bjcq.zybang.com:42915
yarn上是有这个yarn session的
点击ApplicationMaster进去JobManager的web ui,启动一个python程序
./bin/pyflink.sh Adder.py程序起动到了ID为application_1550579025929_89782 的APPID上
JobID为5612d8a9b68a72633a7e2138df4537ba。
再启动一个yarn session
./bin/yarn-session.sh -qu emergency -nm yarn-session-emergency -n 4 -jm 1024m -tm 2096m
在yarn上看到有ID为application_1550579025929_89810的yarn session
再用./bin/pyflink.sh脚本运行python程序
查看JobManager 的web ui上看到是有899b20e2cde7b4ef29afc5b90e56325d的Job ID的。
也就是./bin/pyflink.sh脚本会将python程序运行在最新启动的yarn session上。
再启动一个yarn session
./bin/yarn-session.sh -qu emergency -nm yarn-session-emergency-2 -n 2 -jm 2048m -tm 4096m
Yarn上看到有ID为application_1550579025929_89846的yarn session
./bin/pyflink.sh脚本运行python脚本
到JobManager上看到有jobID为9d7d9d58ee886fd74a7ec56ff487b80d的任务。
Adder.py
from flink.plan.Environment import get_environment
from flink.functions.GroupReduceFunction import GroupReduceFunction
class Adder(GroupReduceFunction):
def reduce(self, iterator, collector):
count, word = iterator.next()
count += sum([x[0] for x in iterator])
collector.collect((count, word))
env = get_environment()
data = env.from_elements("Who's there?",
"I think I hear them. Stand, ho! Who's there?")
data \
.flat_map(lambda x, c: [(1, word) for word in x.lower().split()]) \
.group_by(1) \
.reduce_group(Adder(), combinable=True) \
.output()
env.execute()
flink官网python的Datasource和DataSink
从flink官网看到python的api支持的数据源(Data Source)如下:
-
基于文件的:
read_text(path):按行读取文件并将其作为字符串返回。
read_csv(path, type):解析逗号(或其他字符)分隔字段的文件。返回元组的DataSet。支持基本java类型及其Value对应作为字段类型。
-
基于集合:
from_elements(*args):从Seq创建数据集。所有数据元
generate_sequence(from, to):并行生成给定间隔中的数字序列。
支持的Data Sink
write_text():按字符串顺序写入数据元。通过调用每个数据元的str()方法获得字符串。
write_csv(...):将元组写为逗号分隔值文件。行和字段分隔符是可配置的。每个字段的值来自对象的str()方法。
output():打印标准输出上每个数据元的str()值。
总结
Flink on yarn运行python程序的步骤:
- 开启一个yarn session
- 用 ./bin/pyflink.sh脚本运行python程序,程序会跑在最新启动的yarn session上。
遇到的问题,如果启动多个yarn session,无法通过指定yid来运行到不同yarn session上。
参考:
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/python.html