通过Flink 程序模板来学习StreamExecutionEnvironment 、DataStream 、StreamTransformation类

一、回到起点

一切都从最初的起点开始,我们来看官网给出的Flink Job程序的框架:

public class StreamingJob {

    public static void main(String[] args) throws Exception {
        // set up the streaming execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        /*
         * Here, you can start creating your execution plan for Flink.
         *
         * Start with getting some data from the environment, like
         *  env.readTextFile(textPath);
         *
         * then, transform the resulting DataStream<String> using operations
         * like
         *  .filter()
         *  .flatMap()
         *  .join()
         *  .coGroup()
         *
         * and many more.
         * Have a look at the programming guide for the Java API:
         *
         * http://flink.apache.org/docs/latest/apis/streaming/index.html
         *
         */

        // execute program
        env.execute("Flink Streaming Java API Skeleton");

    }
}

首先我们要创建StreamExecutionEnvironment,这是我们的流计算执行环境(上下文)包含并行度,state,time相关配置。包含StreamTransformation,当然也包含创建DataStreamSource的方法addSource(也是 fromCollection/readTextFile方法中会调用的base方法)。

public abstract class StreamExecutionEnvironment {
    //其他略,主要看一下transformations和addSource方法
    protected final List<StreamTransformation<?>> transformations = new ArrayList<>();

    //重点关注返回DataStreamSource传入了this参数,把自己也传递进去,这个this(StreamExecutionEnvironment 上下文)会在后面DataStream不断进行变化的时候,被回调其addOperator方法,不断的维护transformations 列表
    public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {
        //其他略
        return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName);
    }

    public void addOperator(StreamTransformation<?> transformation) {
        Preconditions.checkNotNull(transformation, "transformation must not be null.");
        this.transformations.add(transformation);
    }
}

这里看到addSource 传入的第一个参数是 SourceFunction类实例。SourceFunction类是Flink所有流数据source的基本接口,定义了source需要实现的run(SourceContext<T> ctx)方法,并通过SourceContext来进行发射数据,当调用cancel的方法时候需要可以break run方法中的loop。而SourceContext接口,使用 collect(T element)来进行数据的发射。

二、DataStreamSource类

整个拓扑,是从env. addSource(或者fromCollection/readTextFile方法,底层也会调用addSource)开始的,DataStreamSource代表着DataStream的起始点。

DataStreamSource是DataStream的子类。

DataStream<--SingleOutputStreamOperator<--DataStreamSource

看看DataStreamSource父类DataStream类最主要的两个变量:

public class DataStream<T> {

    protected final StreamExecutionEnvironment environment;

    protected final StreamTransformation<T> transformation;

StreamExecutionEnvironment 代表执行环境
StreamTransformation 代表产生DataStream的转换操作
同样子类DataStreamSource 也需要这两个变量,看一下其构造函数:

public class DataStreamSource<T> extends SingleOutputStreamOperator<T> {

    public DataStreamSource(StreamExecutionEnvironment environment,
            TypeInformation<T> outTypeInfo, StreamSource<T, ?> operator,
            boolean isParallel, String sourceName) {
        super(environment, new SourceTransformation<>(sourceName, operator, outTypeInfo, environment.getParallelism()));
              //略
    }

第二个参数传输入的是SourceTransformation类,是StreamTransformation的子类,通过构造函数可以看出来比父类StreamTransformation多了一个变量StreamSource operator 。

public class SourceTransformation<T> extends StreamTransformation<T> {

    private final StreamSource<T, ?> operator;

    public SourceTransformation(
            String name,
            StreamSource<T, ?> operator,
            TypeInformation<T> outputType,
            int parallelism) {
        super(name, outputType, parallelism);
        this.operator = operator;
    }

我们继续看一下这个多出来的StreamSource类的operator。稍微复杂一些:

 * @param <OUT> Type of the output elements
 * @param <SRC> Type of the source function of this stream source operator
public class StreamSource<OUT, SRC extends SourceFunction<OUT>>
        extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> {

继承了AbstractUdfStreamOperator 并实现了StreamOperator接口,而AbstractUdfStreamOperator最终通过继承 AbstractStreamOperator,也实现了StreamOperator接口

StreamOperator<--AbstractStreamOperator<--AbstractUdfStreamOperator

还是先看最Base的StreamOperator接口吧,主要定义了lifecycle,包括:setup、open、close等。通常通过实现OneInputStreamOperator或者TwoInputStreamOperator接口来创建operators,但是呢!StreamSource不一般啊,因为是Source,所以直接实现了StreamOperator,虽然感觉好像没有必要啊,因为已经继承了间接实现了StreamOperator的AbstractUdfStreamOperator类。

public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Disposable, Serializable {

AbstractStreamOperator类,提供了默认的lifecycle方法。

// Base class for all stream operators. Operators that contain a user function should extend the class AbstractUdfStreamOperator instead (which is a specialized subclass of this class).
public abstract class AbstractStreamOperator<OUT>
        implements StreamOperator<OUT>, Serializable {

AbstractUdfStreamOperator多包含了一个用户定义function,会在open、close等方法,额外在调用用户自定义function里面对应的方法,丰富需求实现。

/**
 * This is used as the base class for operators that have a user-defined
 * function. This class handles the opening and closing of the user-defined functions,
 * as part of the operator life cycle.
 *
 * @param <F>
 *            The type of the user function
 */
@PublicEvolving
public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
        extends AbstractStreamOperator<OUT>
        implements OutputTypeConfigurable<OUT> {

    /** The user function. */
    protected final F userFunction;

    @Override
    public void open() throws Exception {
        super.open();
        FunctionUtils.openFunction(userFunction, new Configuration());
    }

    @Override
    public void close() throws Exception {
        super.close();
        functionsClosed = true;
        FunctionUtils.closeFunction(userFunction);

嗯,上面几个StreamSource类相关的父类祖宗类接口介绍完了。看看StreamSource类本身,回忆一下,研究StreamSource类是因为在创建DataStreamSource的时候传入的第二个参数SourceTransformation类包含一个StreamSource类的成员operator。

public class SourceTransformation<T> extends StreamTransformation<T> {

    private final StreamSource<T, ?> operator;

花开两朵各表一枝啊,回过头来看DataStreamSource类,本身还继承了SingleOutputStreamOperator。

public class DataStreamSource<T> extends SingleOutputStreamOperator<T> {

    public DataStreamSource(StreamExecutionEnvironment environment,
            TypeInformation<T> outTypeInfo, StreamSource<T, ?> operator,
            boolean isParallel, String sourceName) {
//调用了父类SingleOutputStreamOperator构造函数
        super(environment, new SourceTransformation<>(sourceName, operator, outTypeInfo, environment.getParallelism()));
//略
    }

会调用 SingleOutputStreamOperator的构造函数,继续调用DataStream的构造函数。

public class SingleOutputStreamOperator<T> extends DataStream<T> {

    protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, StreamTransformation<T> transformation) {
        super(environment, transformation);
    }

由此 env. addSource 返回 DataStreamSource类的过程分析完毕,此时我们已经有了一个起点,就是DataStream类的子类DataStreamSource,包含了当前的上下文StreamExecutionEnvironment和变换StreamTransformation(SourceTransformation类)两个成员。在env. addSource 的时候给DataStreamSource类的StreamTransformation子类变量SourceTransformation类传入了一个StreamSource 类成员(是StreamOperator 类的实现& AbstractUdfStreamOperator子类)这个StreamSource类成员,包含传入的SourceFunction类的udfFunction来读取Source的源数据的方法(后面再来看运行的时候如何具体处理的。)

至此,我们已经从env. addSource ,分析了DataStream类和他的子类DataStreamSource(对应数据源)。

接下来,我们要跟随DataStream类,看他的Transformation操作方法,探寻DataStream类和Transformation之间的关系。

三、定义在DataStream类上的Transformation方法:

回忆一下,DataStream类,代表相同类型元素的流,可以通过转换(transformation)来实现转换为另一个DataStream,transformation如map,filter等。

DataStream类最主要的两个变量:

    protected final StreamExecutionEnvironment environment;

    protected final StreamTransformation<T> transformation;

StreamExecutionEnvironment 代表执行环境
StreamTransformation 代表产生自己的DataStream的转换操作,比如上面我们分析的DataStreamSource类 对应的就是SourceTransformation类。

DataStream类包含的方法如map:

    public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {

        TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
                Utils.getCallLocationName(), true);

        return transform("Map", outType, new StreamMap<>(clean(mapper)));
    }

可以看到map方法调用的是transform方法,返回的是DataStream 的子类SingleOutputStreamOperator类,代表有一个输出类型的DataStream。

public class SingleOutputStreamOperator<T> extends DataStream<T> {

接下来,我们看一下transform方法,本质上还是根据转换,生成新的DataStream(map对应返回的是子类SingleOutputStreamOperator),我们已经知道DataStream要传入两个参数1 ExecutionEnvironment本身,2转换类本身。(StreamTransformation的子类)

    public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {

        transformation.getOutputType();

        OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
                this.transformation,
                operatorName,
                operator,
                outTypeInfo,
                environment.getParallelism());

        SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);

        getExecutionEnvironment().addOperator(resultTransform);

        return returnStream;
    }

代码里面首先创建转换对象,根据操作,返回了一个OneInputTransformation类对象。这里有一个重点需要关注的就是,不同于兄弟类SourceTransformation,OneInputTransformation类对象有一个成员变量:input,是StreamTransformation类,代表这个转换的输入转换(可以理解为前一个转换操作)。调用transform方法的时候,传入的是this.transformation。第二个成员变量operator是StreamOperator类的子类,OneInputStreamOperator类。

public class OneInputTransformation<IN, OUT> extends StreamTransformation<OUT> {

    private final StreamTransformation<IN> input;

    private final OneInputStreamOperator<IN, OUT> operator;

可以看到 map方法,输入transform的 operator是 StreamMap类。 StreamMap类是AbstractUdfStreamOperator子类,实现了OneInputStreamOperator接口。

/**
* A {@link StreamOperator} for executing {@link MapFunction MapFunctions}.
*/
@Internal
public class StreamMap<IN, OUT>
       extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
       implements OneInputStreamOperator<IN, OUT> {

   private static final long serialVersionUID = 1L;

   public StreamMap(MapFunction<IN, OUT> mapper) {
       super(mapper);
       chainingStrategy = ChainingStrategy.ALWAYS;
   }

   @Override
   public void processElement(StreamRecord<IN> element) throws Exception {
       output.collect(element.replace(userFunction.map(element.getValue())));
   }
}

至此,我们也可以获得如下认识:一个StreamTransformation类会包含一个名叫operator 的StreamOperator类的成员变量,代表对应算子,同时,通常会包含一个名叫input的StreamTransformation类,记录输入(上一个)转换,这样实际上就是形成了一个Transformation List链表,把所有的Transformation都串成串!!特别在运行时,创建运行Dataflow时候很有用。也依靠transform方法返回前的一句操作,将当前生成的新的StreamTransformation传入ExecutionEnvironment的transformations成员变量,能够让ExecutionEnvironment 也能记录所有的Transformation,真正拥有了Transformation List!!StreamExecutionEnvironment真正可以把控全局。

 transform 方法中:
    getExecutionEnvironment().addOperator(resultTransform);
public abstract class StreamExecutionEnvironment {

    protected final List<StreamTransformation<?>> transformations = new ArrayList<>();

    public void addOperator(StreamTransformation<?> transformation) {
        Preconditions.checkNotNull(transformation, "transformation must not be null.");
        this.transformations.add(transformation);
      }

DataStream类中其他转换方法类似,如filter

    public SingleOutputStreamOperator<T> filter(FilterFunction<T> filter) {
        return transform("Filter", getType(), new StreamFilter<>(clean(filter)));

    }

再如flatMap

    public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {

        TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
                getType(), Utils.getCallLocationName(), true);

        return transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper)));

    }

都是调用transform方法,只不过传入的名称和算子对象不一样,对应的自定义userFunction就不一样,调用processElement时候使用userFunction方式不同。对应处理数据方式和结果也就不同,不同转换(算子)才会有不同的功能。

至此我们从env. addSource开始(返回第一个DataStream类子类DataStreamSource类),通过不断调用DataStream类的转换方法(map,filter等)来得到新的DataStream类。

这个过程中宏观上,我们可以看到StreamExecutionEnvironment 对象env,不断记录DataTransformation对象,维护整个transformation list。

微观上,每个DataStream类在做转换的时候,会首先创建转换对应的DataTransformation对象(被传入上一个DataTransformation对象,也就是创建这个新DataTransformation对象的DataStream被创建对应的DataTransformation对象,因此在DataTransformation层面,整体形成了一个DataTransformation链表,每一个节点对象维护自己上一个节点)。之后返回一个新的DataStream类对象,拥有当前DataStream的StreamExecutionEnvironment成员,和创建新的DataStream类对象的DataTransformation对象。

四、StreamTransformation类

StreamTransformation代表创建DataStream类的操作。DataStream类内部是包含一个创建他自己的StreamTransformation。

//构造函数,重点包括了id(唯一的)和outputType转换的输出类型
    public StreamTransformation(String name, TypeInformation<T> outputType, int parallelism) {
        this.id = getNewNodeId();
        this.name = Preconditions.checkNotNull(name);
        this.outputType = outputType;
        this.parallelism = parallelism;
        this.slotSharingGroup = null;
    }

    //通过static变量和方法,保证了每个StreamTransformation的唯一ID
    protected static Integer idCounter = 0;

    public static int getNewNodeId() {
        idCounter++;
        return idCounter;
    }

其他

如果是本地IDE运行的话StreamExecutionEnvironment.getExecutionEnvironment()返回一个LocalStreamEnvironment子类,execute方法开始执行整个流计算的拓扑,在一个mini cluster上。

    /**
     * Executes the JobGraph of the on a mini cluster of CLusterUtil with a user
     * @return The result of the job execution, containing elapsed time and accumulators.
     */
    @Override
    public JobExecutionResult execute(String jobName) throws Exception {
        /* 从StreamTransformations构建拓扑
        *FlinkPlan 接口 <-- StreamingPlan 抽象类 <-- StreamGraph 
        */
        StreamGraph streamGraph = getStreamGraph();
        streamGraph.setJobName(jobName);
        // JobGraph ,代表 JobManager接受的 Flink dataflow,是在底层的
        JobGraph jobGraph = streamGraph.getJobGraph();
        jobGraph.setAllowQueuedScheduling(true);

        Configuration configuration = new Configuration();
        configuration.addAll(jobGraph.getJobConfiguration());
        configuration.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "0");

        // add (and override) the settings with what the user defined
        configuration.addAll(this.configuration);

        if (!configuration.contains(RestOptions.PORT)) {
            configuration.setInteger(RestOptions.PORT, 0);
        }

        int numSlotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());

        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
            .setConfiguration(configuration)
            .setNumSlotsPerTaskManager(numSlotsPerTaskManager)
            .build();

        if (LOG.isInfoEnabled()) {
            LOG.info("Running job on local embedded Flink mini cluster");
        }
        // 创建 执行Flink Job的MiniCluster        
        MiniCluster miniCluster = new MiniCluster(cfg);

        try {
            miniCluster.start();
            configuration.setInteger(RestOptions.PORT, miniCluster.getRestAddress().getPort());

            return miniCluster.executeJobBlocking(jobGraph);
        }
        finally {
            transformations.clear();
            miniCluster.close();
        }
    }

Flink 运行时,会把流拓扑(Source、Transformation、Sink组成)转换为DataFlow(由Stream和 Operator算子组成)

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 196,200评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,526评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,321评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,601评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,446评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,345评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,753评论 3 387
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,405评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,712评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,743评论 2 314
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,529评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,369评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,770评论 3 300
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,026评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,301评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,732评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,927评论 2 336

推荐阅读更多精彩内容