flink之jobgraph---JobVertex

一。JobVertex

在jobgraph中有一个组成“元素”:JobVertex是不得不提的:jobvertex用于产生intermediate dataset,并通过jobedge串联不同的jobvertex同时也是将operator chain的"关键点"。 jobvertex是从job层面对task进行抽象。

二。源码

第一部分:属性或字段

       // --------------------------------------------------------------------------------------------
    // Members that define the structure / topology of the graph
    // --------------------------------------------------------------------------------------------

    // 当前jobvertex id,与之关联的对应类:JobVertexID
    // 关于jobvertex id产生:
    //
    /** The ID of the vertex. */
    private final JobVertexID id;

    // 候选vertex id
    /** The alternative IDs of the vertex. */
    private final ArrayList<JobVertexID> idAlternatives = new ArrayList<>();

    // jobvertex关联的operator
    /** The IDs of all operators contained in this vertex. */
    private final ArrayList<OperatorID> operatorIDs = new ArrayList<>();

    // 候选operatorid
    /** The alternative IDs of all operators contained in this vertex. */
    private final ArrayList<OperatorID> operatorIdsAlternatives = new ArrayList<>();

    // 当前jobvertex产出的临时中间数据集:IntermediateDataSets
    /** List of produced data sets, one per writer */
    private final ArrayList<IntermediateDataSet> results = new ArrayList<IntermediateDataSet>();

    // 当前jobvertex提供给下游jobvertex读取的通道:一个下游读取vertex对应一个reader关联一个jobedge
    /** List of edges with incoming data. One per Reader. */
    private final ArrayList<JobEdge> inputs = new ArrayList<JobEdge>();

    // 该task在runtime时被分割的subtask
    /** Number of subtasks to split this task into at runtime.*/
    private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT;

    // 当前task在runtime时被分割成subtask的最小任务
    /** Maximum number of subtasks to split this task into a runtime. */
    private int maxParallelism = -1;

    // 当前jobvertex需要最小程度的资源
    /** The minimum resource of the vertex */
    private ResourceSpec minResources = ResourceSpec.DEFAULT;

    // 当前jobvertex采用的最优资源
    /** The preferred resource of the vertex */
    private ResourceSpec preferredResources = ResourceSpec.DEFAULT;

    // 在runtime期间分配给当前task的配置信息
    /** Custom configuration passed to the assigned task at runtime. */
    private Configuration configuration;

    // 当前task执行invoke的class
    /** The class of the invokable. */
    private String invokableClassName;

    // 标示当前jobvertex是否停止
    /** Indicates of this job vertex is stoppable or not. */
    private boolean isStoppable = false;

    // input通过format被split
    /** Optionally, a source of input splits */
    private InputSplitSource<?> inputSplitSource;

    // 当前jobvertex的name
    /** The name of the vertex. This will be shown in runtime logs and will be in the runtime environment */
    private String name;

    // 通过定义sharing group来保证来自不同jobvertex能够并发运行在一个slot里面
    // 特别是使用coLocation时 是需要对应的task属于同于一个sharding group
    /** Optionally, a sharing group that allows subtasks from different job vertices to run concurrently in one slot */
    private SlotSharingGroup slotSharingGroup;

    // jobvertex的子任务共享slot的组,operator chain
    /** The group inside which the vertex subtasks share slots */
    private CoLocationGroup coLocationGroup;

    // 如下参数都是被记录在json plan中的
    // operator name
    /** Optional, the name of the operator, such as 'Flat Map' or 'Join', to be included in the JSON plan */
    private String operatorName;

    // 针对当前jobvertex的描述
    /** Optional, the description of the operator, like 'Hash Join', or 'Sorted Group Reduce',
     * to be included in the JSON plan */
    private String operatorDescription;

    /** Optional, pretty name of the operator, to be displayed in the JSON plan */
    private String operatorPrettyName;

    // 主要记录针对operator进行优化的property,会被写入到json plan中
    /** Optional, the JSON for the optimizer properties of the operator result,
     * to be included in the JSON plan */
    private String resultOptimizerProperties;

    // 调度这个jobvertex时,依赖输入限制策略:
    // 1。ANY:只要上游的input相关的subtask有完成的 即可开启当前jobvertex的subtask处理
    // 2。ALL:必须等待上游的input相关subtasks全部完成,才会启动jobvertex的subtask来获取数据
    // 一般来说在指定一致性语义时需要注意这两种策略
    /** The input dependency constraint to schedule this vertex. */
    private InputDependencyConstraint inputDependencyConstraint = InputDependencyConstraint.ANY;

第二部分:方法或函数

  1. 被执行的具体task
  /**
     * Returns the invokable class which represents the task of this vertex
     * 
     * @param cl The classloader used to resolve user-defined classes
     * @return The invokable class, <code>null</code> if it is not set
     */
    // 用来指定当前jobvertex对应具体的task
    // 比如DataSinkTask/BatchTask等 会被执行的task
    public Class<? extends AbstractInvokable> getInvokableClass(ClassLoader cl) {
        if (cl == null) {
            throw new NullPointerException("The classloader must not be null.");
        }
        if (invokableClassName == null) {
            return null;
        }

        try {
            return Class.forName(invokableClassName, true, cl).asSubclass(AbstractInvokable.class);
        }
        catch (ClassNotFoundException e) {
            throw new RuntimeException("The user-code class could not be resolved.", e);
        }
        catch (ClassCastException e) {
            throw new RuntimeException("The user-code class is no subclass of " + AbstractInvokable.class.getName(), e);
        }
    }

2.跟operator chain相关的内容

/**
     * Associates this vertex with a slot sharing group for scheduling. Different vertices in the same
     * slot sharing group can run one subtask each in the same slot.
     * 
     * @param grp The slot sharing group to associate the vertex with.
     */
    // 将该jobvertex通过slotgroup组合起来便于schedule,
    // 只要在相同的slot group隶属不同的job vertex能够彼此在同一个slot被调用
    public void setSlotSharingGroup(SlotSharingGroup grp) {
        if (this.slotSharingGroup != null) {
            this.slotSharingGroup.removeVertexFromGroup(id);
        }

        this.slotSharingGroup = grp;
        if (grp != null) {
            grp.addVertexToGroup(id);
        }
    }

    /**
     * Gets the slot sharing group that this vertex is associated with. Different vertices in the same
     * slot sharing group can run one subtask each in the same slot. If the vertex is not associated with
     * a slot sharing group, this method returns {@code null}.
     * 
     * @return The slot sharing group to associate the vertex with, or {@code null}, if not associated with one.
     */
    // 当一个jobvertex未被分配slotgroup时 与该jobvertex是没有关联的slotgroup,直接返回null
    // 默认情况下slotgroup名字:default
    @Nullable
    public SlotSharingGroup getSlotSharingGroup() {
        return slotSharingGroup;
    }

    /**
     * Tells this vertex to strictly co locate its subtasks with the subtasks of the given vertex.
     * Strict co-location implies that the n'th subtask of this vertex will run on the same parallel computing
     * instance (TaskManager) as the n'th subtask of the given vertex.
     * 
     * NOTE: Co-location is only possible between vertices in a slot sharing group.
     * 
     * NOTE: This vertex must (transitively) depend on the vertex to be co-located with. That means that the
     * respective vertex must be a (transitive) input of this vertex.
     * 
     * @param strictlyCoLocatedWith The vertex whose subtasks to co-locate this vertex's subtasks with.
     * 
     * @throws IllegalArgumentException Thrown, if this vertex and the vertex to co-locate with are not in a common
     *                                  slot sharing group.
     * 
     * @see #setSlotSharingGroup(SlotSharingGroup)
     */
    // 将当前jobvertex与给定的jobvertex的subtasks关联在一起
    // 也就意味着比如当前jobvertex的第n个subtask和给定的jobvertex第n任务在相同tm上被同一个slot执行
    // 需要一些限制条件:
    // 1。需要当前的jobvertex和给定jobvertex隶属同一个slotgroup
    // 2。当前jobvertex的input是依赖与之关联jobvertex,是可传递的,forward
    // 在进行operator chain时会通过这种方式来进行处理
    public void setStrictlyCoLocatedWith(JobVertex strictlyCoLocatedWith) {
        if (this.slotSharingGroup == null || this.slotSharingGroup != strictlyCoLocatedWith.slotSharingGroup) {
            throw new IllegalArgumentException("Strict co-location requires that both vertices are in the same slot sharing group.");
        }

        CoLocationGroup thisGroup = this.coLocationGroup;
        CoLocationGroup otherGroup = strictlyCoLocatedWith.coLocationGroup;
        // 首先保证两个jobvertex隶属同一slotgroup
        if (otherGroup == null) {
            if (thisGroup == null) { // 两个jobvertex不隶属任何slotgroup;直接构建一个colocationgroup将jobvertex关联在一起
                CoLocationGroup group = new CoLocationGroup(this, strictlyCoLocatedWith);
                this.coLocationGroup = group;
                strictlyCoLocatedWith.coLocationGroup = group;
            }
            else { // 指定的jobvertex没有对应的slotgroup,直接使用当前的jobvertex所在的slotgroup
                thisGroup.addVertex(strictlyCoLocatedWith);
                strictlyCoLocatedWith.coLocationGroup = thisGroup;
            }
        }
        else {
            if (thisGroup == null) { // 当前的jobvertex不存在对应的slotgroup,使用给定的jobvertex的slotgroup
                otherGroup.addVertex(this);
                this.coLocationGroup = otherGroup;
            }
            else { // 两个jobvertex具备不同的slotgroup 需要进行合并
                // both had yet distinct groups, we need to merge them
                thisGroup.mergeInto(otherGroup);
            }
        }
    }
  1. ResultPartitionType
    表示jobvertex产生intermediate dataset后以什么方式进行partition
/**
     * Blocking partitions represent blocking data exchanges, where the data stream is first
     * fully produced and then consumed. This is an option that is only applicable to bounded
     * streams and can be used in bounded stream runtime and recovery.
     *
     * <p>Blocking partitions can be consumed multiple times and concurrently.
     *
     * <p>The partition is not automatically released after being consumed (like for example the
     * {@link #PIPELINED} partitions), but only released through the scheduler, when it determines
     * that the partition is no longer needed.
     */
    // 以block方式传输数据
    // 需要当前的数据要全部生产完成,方可消费
    // 该partition方式常用于bound stream,既能用于正常的数据处理又可以用于故障恢复
    // 需要注意一点:block产生的partition可以被重复消费
    BLOCKING(false, false, false, false),

    /**
     * BLOCKING_PERSISTENT partitions are similar to {@link #BLOCKING} partitions, but have
     * a user-specified life cycle.
     *
     * <p>BLOCKING_PERSISTENT partitions are dropped upon explicit API calls to the
     * JobManager or ResourceManager, rather than by the scheduler.
     *
     * <p>Otherwise, the partition may only be dropped by safety-nets during failure handling
     * scenarios, like when the TaskManager exits or when the TaskManager looses connection
     * to JobManager / ResourceManager for too long.
     */
    // 类似block partition方式,不过用户可以指定生命周期
    // 针对BLOCKING_PERSISTENT partition被清理,只能由jobmanager或resourcemanager来完成,不能通过scheduler来完成
    // 不过出现tm退出或者tm与jobmanager间失联时间过长,此时BLOCKING_PERSISTENT partition只能在失败处理期间通过safety-nets来清理
    BLOCKING_PERSISTENT(false, false, false, true),

    /**
     * A pipelined streaming data exchange. This is applicable to both bounded and unbounded streams.
     *
     * <p>Pipelined results can be consumed only once by a single consumer and are automatically
     * disposed when the stream has been consumed.
     *
     * <p>This result partition type may keep an arbitrary amount of data in-flight, in contrast to
     * the {@link #PIPELINED_BOUNDED} variant.
     */
    // 数据以pipelined的方式进行传输,能够支持流和批处理
    //  针对pipelined产生的partition只能被消费一次并且是一个consumer;
    //  一旦pipelined的partition被消费过 将会自动被丢弃
    PIPELINED(true, true, false, false),

    /**
     * Pipelined partitions with a bounded (local) buffer pool.
     *
     * <p>For streaming jobs, a fixed limit on the buffer pool size should help avoid that too much
     * data is being buffered and checkpoint barriers are delayed. In contrast to limiting the
     * overall network buffer pool size, this, however, still allows to be flexible with regards
     * to the total number of partitions by selecting an appropriately big network buffer pool size.
     *
     * <p>For batch jobs, it will be best to keep this unlimited ({@link #PIPELINED}) since there are
     * no checkpoint barriers.
     */
    // 类似Pipelined partition,不过它还附带了一个有界的本地buffer pool
    // PIPELINED_BOUNDED用于stream job时通过指定固定大小的buffer pool
    // 方面一。能够将数据固定大小的数据进行buffer,这样也可以使得checkpoint对齐不会被延迟太久,整体数据处理吞吐量也会提升;
    // 另一方面,由于使用的是固定大小的buffer pool将数据buffer,能够相对较好调整及时性和吞吐量两者的平衡
    // 不过该partition方式的本地buffer pool不同于network buffer pool大小的限制,该方式能通过选择适当的大的网络缓冲池大小来灵活地控制分区的总数
    // 针对batch job来说由于不存在checkpoint对齐的过程,是没有限制的
    PIPELINED_BOUNDED(true, true, true, false);

三。源码
JobVertex.java源码

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 196,200评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,526评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,321评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,601评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,446评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,345评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,753评论 3 387
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,405评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,712评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,743评论 2 314
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,529评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,369评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,770评论 3 300
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,026评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,301评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,732评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,927评论 2 336

推荐阅读更多精彩内容

  • System Architecture 分布式系统需要解决:分配和管理在集群的计算资源、处理配合、持久和可访问的数...
    allin8116阅读 947评论 0 0
  • 1. 主要内容 本文主要是将用户写的java程序如何生成Flink JobGraph的过程与逻辑追踪了一下,欢迎有...
    ni_d58f阅读 1,304评论 0 1
  • 很实用的编程英语词库,共收录一千五百余条词汇。 第一部分: application 应用程式 应用、应用程序app...
    春天的蜜蜂阅读 1,329评论 0 22
  • Java基础常见英语词汇(共70个)['ɔbdʒekt] ['ɔ:rientid]导向的 ...
    今夜子辰阅读 3,251评论 1 34
  • Apache Flink是一个面向分布式数据流处理和批量数据处理的开源计算平台,它能够基于同一个Flink运行时,...
    康小为6840阅读 1,177评论 0 7