Flink on yarn运行python程序

Flink on yarn运行python程序

Flink本地local模式运行python程序

参考官网写了python程序Adder.py(后面贴上代码),在本地local模式用./bin/pyflink.sh ./Adder.p启动是可以的

➜  flink-1.7.1 ./bin/pyflink.sh ./Adder.py
Starting execution of program
Program execution finished
Job with JobID 8c7ab33a382c279b66089d43693fde52 has finished.
Job Runtime: 679 ms

jobManager web ui上查看是有JobID 为8c7ab33a382c279b66089d43693fde52的任务的。


可以看到本地local模式是可以运行python程序的。

Flink on yarn运行python程序

在服务器6.34上执行(6.34上有单机的flink客户端,可以连接yarn,向yarn提交flink程序)
iknow@search-uda-6-34:~/wangbin/flink-1.7.2 $ ./bin/pyflink.sh ./Adder.py

默认会起动到application_1550579025929_61820上,指定了yid参数也会默认跑在application_1550579025929_61820上

决定换一台机器试试,到0-107上,这里省略拷贝hadoop和flink环境的过程,
启动一个yarn session:

./bin/yarn-session.sh -qu core -nm yarn-session-core -n 2 -jm 1024m -tm 2096m

启动日志如下:

iknow@search-uda-0-107:~/wangbin/flink/flink-1.7.2 $ ./bin/yarn-session.sh -qu core -nm yarn-session-core -n 2 -jm 1024m -tm 2096m
2019-03-01 19:53:31,148 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, localhost
2019-03-01 19:53:31,150 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2019-03-01 19:53:31,151 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.size, 1024m
2019-03-01 19:53:31,151 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.size, 1024m
2019-03-01 19:53:31,151 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2019-03-01 19:53:31,152 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 1
2019-03-01 19:53:31,153 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: rest.port, 8081
2019-03-01 19:53:31,815 WARN  org.apache.hadoop.util.NativeCodeLoader                       - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2019-03-01 19:53:31,978 WARN  org.apache.hadoop.conf.Configuration                          - /home/iknow/hadoop-client/etc/hadoop/core-site.xml:an attempt to override final parameter: fs.defaultFS;  Ignoring.
2019-03-01 19:53:31,980 WARN  org.apache.hadoop.conf.Configuration                          - /home/iknow/hadoop-client/etc/hadoop/hdfs-site.xml:an attempt to override final parameter: dfs.datanode.data.dir;  Ignoring.
2019-03-01 19:53:31,981 WARN  org.apache.hadoop.conf.Configuration                          - /home/iknow/hadoop-client/etc/hadoop/hdfs-site.xml:an attempt to override final parameter: dfs.namenode.name.dir;  Ignoring.
2019-03-01 19:53:31,986 INFO  org.apache.flink.runtime.security.modules.HadoopModule        - Hadoop user set to iknow (auth:SIMPLE)
2019-03-01 19:53:32,451 INFO  org.apache.hadoop.yarn.client.AHSProxy                        - Connecting to Application History server at data-hadoop-112-16.bjrs.zybang.com/192.168.112.16:10200
2019-03-01 19:53:32,479 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                 - The argument n is deprecated in will be ignored.
2019-03-01 19:53:32,542 INFO  org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider  - Looking for the active RM in [rm1, rm2]...
2019-03-01 19:53:32,662 INFO  org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider  - Found active RM [rm1]
2019-03-01 19:53:32,720 WARN  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - The JobManager or TaskManager memory is below the smallest possible YARN Container size. The value of 'yarn.scheduler.minimum-allocation-mb' is '2048'. Please increase the memory size.YARN will allocate the smaller containers but the scheduler will account for the minimum-allocation-mb, maybe not all instances you requested will start.
2019-03-01 19:53:32,720 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Cluster specification: ClusterSpecification{masterMemoryMB=2048, taskManagerMemoryMB=2096, numberTaskManagers=2, slotsPerTaskManager=1}
2019-03-01 19:53:33,265 WARN  org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory       - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2019-03-01 19:53:33,274 WARN  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - The configuration directory ('/home/iknow/wangbin/flink/flink-1.7.2/conf') contains both LOG4J and Logback configuration files. Please delete or rename one of them.
2019-03-01 19:53:42,592 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Submitting application master application_1550579025929_89782
2019-03-01 19:53:42,849 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Submitted application application_1550579025929_89782
2019-03-01 19:53:42,849 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Waiting for the cluster to be allocated
2019-03-01 19:53:42,853 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Deploying cluster, current state ACCEPTED
2019-03-01 19:53:50,718 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - YARN application has been deployed successfully.
2019-03-01 19:53:51,320 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest client endpoint started.
Flink JobManager is now running on search-as-107-45.bjcq.zybang.com:42915 with leader id 00000000-0000-0000-0000-000000000000.
JobManager Web Interface: http://search-as-107-45.bjcq.zybang.com:42915

yarn上是有这个yarn session的


点击ApplicationMaster进去JobManager的web ui,启动一个python程序
./bin/pyflink.sh Adder.py程序起动到了ID为application_1550579025929_89782 的APPID上


JobID为5612d8a9b68a72633a7e2138df4537ba。

再启动一个yarn session
./bin/yarn-session.sh -qu emergency -nm yarn-session-emergency -n 4 -jm 1024m -tm 2096m


在yarn上看到有ID为application_1550579025929_89810的yarn session


再用./bin/pyflink.sh脚本运行python程序


查看JobManager 的web ui上看到是有899b20e2cde7b4ef29afc5b90e56325d的Job ID的。


也就是./bin/pyflink.sh脚本会将python程序运行在最新启动的yarn session上。

再启动一个yarn session

./bin/yarn-session.sh -qu emergency -nm yarn-session-emergency-2 -n 2 -jm 2048m -tm 4096m

Yarn上看到有ID为application_1550579025929_89846的yarn session


./bin/pyflink.sh脚本运行python脚本


到JobManager上看到有jobID为9d7d9d58ee886fd74a7ec56ff487b80d的任务。


Adder.py

from flink.plan.Environment import get_environment
from flink.functions.GroupReduceFunction import GroupReduceFunction

class Adder(GroupReduceFunction):
  def reduce(self, iterator, collector):
    count, word = iterator.next()
    count += sum([x[0] for x in iterator])
    collector.collect((count, word))

env = get_environment()
data = env.from_elements("Who's there?",
 "I think I hear them. Stand, ho! Who's there?")

data \
  .flat_map(lambda x, c: [(1, word) for word in x.lower().split()]) \
  .group_by(1) \
  .reduce_group(Adder(), combinable=True) \
  .output()

env.execute()

flink官网python的Datasource和DataSink

从flink官网看到python的api支持的数据源(Data Source)如下:

  • 基于文件的:

    • read_text(path):按行读取文件并将其作为字符串返回。

    • read_csv(path, type):解析逗号(或其他字符)分隔字段的文件。返回元组的DataSet。支持基本java类型及其Value对应作为字段类型。

  • 基于集合:

    • from_elements(*args):从Seq创建数据集。所有数据元

    • generate_sequence(from, to):并行生成给定间隔中的数字序列。

支持的Data Sink

  • write_text():按字符串顺序写入数据元。通过调用每个数据元的str()方法获得字符串。

  • write_csv(...):将元组写为逗号分隔值文件。行和字段分隔符是可配置的。每个字段的值来自对象的str()方法。

  • output():打印标准输出上每个数据元的str()值。

总结

Flink on yarn运行python程序的步骤:

    1. 开启一个yarn session
    1. 用 ./bin/pyflink.sh脚本运行python程序,程序会跑在最新启动的yarn session上。
  • 遇到的问题,如果启动多个yarn session,无法通过指定yid来运行到不同yarn session上。

参考:
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/python.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,132评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,802评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,566评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,858评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,867评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,695评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,064评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,705评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,915评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,677评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,796评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,432评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,041评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,992评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,223评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,185评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,535评论 2 343

推荐阅读更多精彩内容

  • Flink on yarn部署模式 背景 Flink是一个高性能,高吞吐,低延迟的流处理框架。它不仅仅是作为一个流...
    it_zzy阅读 72,931评论 5 31
  • 1、Standalone 软件要求: Java 1.8.x or higher ssh JAVA_HOME配置 Y...
    JACKbayue阅读 17,299评论 0 6
  • 1.下载Flink压缩包 下载地址:http://flink.apache.org/downloads.html。...
    尼小摩阅读 62,933评论 2 22
  • 今天去看了前任,现在还在电影的剧情里。走心,更扎心。一路上跟胡老师影评,都觉得剧情不是太怎么样,但是像极了人...
    轻瑜伽阅读 230评论 0 0
  • 地上的枯叶在抓蜻蜓的影子,风停了的时候,自己又会是在哪里?枯叶会不会也是因为这样想,被风牵过思绪带走,随风走了。风...
    4e38ebe82a21阅读 185评论 0 1