flink学习之三--引入spring

Flink中引入Spring

一个flink项目中可能存在多个job,不过一般每个job都是一个main方法了事,主要逻辑也在这个main中,如果需要用到别的功能,一般都是直接new,一直在业务中使用的spring并没用到。而在flink中可以用spring么?当然,并没有什么限制,只是需要手动初始化spring容器而已。

在这里主要引入spring管理数据库连接池,而不是直接用jdbc connection处理数据库连接,再将数据库作为flink的数据源。

1、引入依赖

spring相关依赖:

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.1.3.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-tx</artifactId>
            <version>5.1.3.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jdbc</artifactId>
            <version>5.1.3.RELEASE</version>
        </dependency>

mysql驱动

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.13</version>
        </dependency>

使用druid数据库连接池

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.11</version>
        </dependency>

mybatis相关依赖:

       <dependency>
            <groupId>org.mybatis</groupId>
            <artifactId>mybatis-spring</artifactId>
            <version>1.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.mybatis</groupId>
            <artifactId>mybatis</artifactId>
            <version>3.4.0</version>
        </dependency>

2、配置maven插件

如果这里不设置,在本地跑main方法是没啥问题的,不过打包之后上传到flink集群中,会出现找不到spring相关类的异常,配置如下:

<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
            <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>myflink.job.KafkaDatasouceForFlinkJob</mainClass>
                                </transformer>

                                    <transformer
                                            implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                                    <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                        <resource>META-INF/spring.handlers</resource>
                                    </transformer>
                                    <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                        <resource>META-INF/spring.schemas</resource>
                                    </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

这里需要注意的有:

1、maven-shade-plugin:创建项目的时候指定 Flink Quickstart Job类型,会自动在pom.xml添加这个plugin配置。

2、mainClass:指定整个jar包的入口,不过其实关系不大,在上传到flink上之后,可以在submit job的时候再重新指定。

3、AppendingTransformer,这两个配置是关键,会把spring相关的依赖、配置都打包到jar中。

spring配置

beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd"
       default-lazy-init="false">

    <context:annotation-config/>
    <context:component-scan base-package="myflink"/>
    <context:component-scan base-package="myflink.ds"/>
    <context:component-scan base-package="myflink.repository"/>

    <bean id="dataSource" class="com.alibaba.druid.pool.DruidDataSource" init-method="init" destroy-method="close">
        <property name="url" value="jdbc:mysql://localhost:3306/log" />
        <property name="username" value="root" />
        <property name="password" value="root" />

        <property name="filters" value="stat" />

        <property name="maxActive" value="20" />
        <property name="initialSize" value="1" />
        <property name="maxWait" value="60000" />
        <property name="minIdle" value="1" />

        <property name="timeBetweenEvictionRunsMillis" value="60000" />
        <property name="minEvictableIdleTimeMillis" value="300000" />

        <property name="testWhileIdle" value="true" />
        <property name="testOnBorrow" value="false" />
        <property name="testOnReturn" value="false" />

        <property name="poolPreparedStatements" value="true" />
        <property name="maxOpenPreparedStatements" value="20" />

        <property name="asyncInit" value="true" />
    </bean>

    <bean class="org.mybatis.spring.mapper.MapperScannerConfigurer">
        <property name="basePackage" value="myflink.repository"/>
        <property name="sqlSessionFactoryBeanName" value="myBatisSqlSessionFactory"/>
        <property name="annotationClass" value="org.springframework.stereotype.Repository"/>
    </bean>

    <bean id="myBatisSqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean">
        <property name="dataSource" ref="dataSource"/>
        <property name="mapperLocations" value="classpath*:mapper/*Mapper.xml"/>
    </bean>

    <bean id="mySqlTransactionManager"
          class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
        <property name="dataSource" ref="dataSource"/>
    </bean>

</beans>

如上所示,就是一个普通的spring配置,这里配置了annotation扫描的表、数据库连接池、mybatis相关配置、事务等。

代码中处理

1、初步尝试

目前接触到的所有flink job都是从main方法中开始的,本来打算在main方法中初始化applicationContext,并把applicationContext作为参数传入到数据源中,或者直接将dataSource从spring中当做bean拿到,并放入addSource中,如下:

public class MySqlDSPoolForFlinkJob {

    public static void main(String[] args) throws Exception {

        // 初始化spring容器
        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath*:applicationContext.xml");

        // 获取bean
        MysqlDSWithSpringForFlink mysqlDSWithSpringForFlink = (MysqlDSWithSpringForFlink) applicationContext.getBean("mysqlDsWithSpring");

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.addSource(mysqlDSWithSpringForFlink).addSink(new PrintSinkFunction<>());

        env.execute("mysql Datasource with pool and spring");
    }
}

但是运行起来的时候,总是如下错:

2019-01-14 21:43:54.729 [main] INFO  com.alibaba.druid.pool.DruidDataSource - {dataSource-1} inited
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The implementation of the RichSourceFunction is not serializable. The object probably contains or references non serializable fields.

即使做了spring容器的初始化,下层中依然无法获取到applicationContext。

所以在main中,依然只能用new的方式获取对应的bean。

2、在具体的业务逻辑中使用

既然在main方法中无法直接传入bean,那就让main方法只是作为一层简单封装,具体的datasource、sink等操作都放在下层,也就是在具体的DataSource实现中进行spring初始化处理。

修改后 main 方法的实现如下:

public class MySqlDSPoolForFlinkJob {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 添加flink数据源
        env.addSource(new MysqlDSWithSpringForFlink()).addSink(new PrintSinkFunction<>());

        env.execute("mysql Datasource with pool and spring");
    }
}

关键是MysqlDSWithSpringForFlink中的实现。

3、flink中的dataSource

flink中的数据源需要实现RichSourceFunction<T>抽象类中的方法,这里的实现如下:

package myflink.ds;

import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import myflink.manager.UrlInfoManager;
import myflink.model.UrlInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import java.util.List;

@Slf4j
@AllArgsConstructor
@NoArgsConstructor
public class MysqlDSWithSpringForFlink extends RichSourceFunction<UrlInfo> implements ApplicationContextAware {

    private UrlInfoManager urlInfoManager;

    private ApplicationContext applicationContext;

    @Override
    public void open(Configuration parameters) throws Exception {
        log.info("------open connection,applicationContext=" + applicationContext);
        super.open(parameters);
        if(applicationContext == null){
            init();
        }

    }

    private void init(){
        // 在这里进行spring的初始化
        applicationContext = new ClassPathXmlApplicationContext("classpath*:applicationContext.xml");
        urlInfoManager = (UrlInfoManager) applicationContext.getBean("urlInfoManager");
    }

    @Override
    public void run(SourceContext<UrlInfo> sourceContext) throws Exception {
        log.info("------query ");

        if(urlInfoManager == null){
            init();
        }

        List<UrlInfo> urlInfoList = urlInfoManager.queryAll();
        urlInfoList.parallelStream().forEach(urlInfo -> sourceContext.collect(urlInfo));
    }

    @Override
    public void cancel() {
        log.info("------cancel ");
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}

说明:

a、ApplicationContextAware 接口,表示可以取得applicationContext,避免在多线程的情况下多次初始化spring。

b、这里在open方法中调用初始化spring容器的方法

c、urlInfoManager就直接通过spring管理了,具体的实现放在下文

4、关于RichSourceFunction

RichSourceFunction的继承关系如下:

public abstract class RichSourceFunction<OUT> extends AbstractRichFunction implements SourceFunction<OUT>{}

注意AbstractRichFunction,这个类是RichSourceFunction、RichSinkFunction的父类,也就是Flink中自定义的DataSource、Sink都是来源于这个类。

@Public
public abstract class AbstractRichFunction implements RichFunction, Serializable {
    ......

    public void open(Configuration parameters) throws Exception {
    }

    public void close() throws Exception {
    }
}

这里主要封装了open、close方法,用于初始化数据源链接、关闭数据源链接。

对于SourceFunction,看下其中实现

package org.apache.flink.streaming.api.functions.source;

import java.io.Serializable;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.streaming.api.watermark.Watermark;

@Public
public interface SourceFunction<T> extends Function, Serializable {
    void run(SourceFunction.SourceContext<T> var1) throws Exception;

    void cancel();

    @Public
    public interface SourceContext<T> {
        void collect(T var1);

        @PublicEvolving
        void collectWithTimestamp(T var1, long var2);

        @PublicEvolving
        void emitWatermark(Watermark var1);

        @PublicEvolving
        void markAsTemporarilyIdle();

        Object getCheckpointLock();

        void close();
    }
}

这里主要用到的是run方法,里面是主要的数据源的操作实现。

5、UrlInfo mybatis相关的实现

UrlInfo pojo:

package myflink.model;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = false)
public class UrlInfo {
    private int id;

    private String url;

    private String hash;
}

UrlInfoRepository:

package myflink.repository;

import myflink.model.UrlInfo;
import org.springframework.stereotype.Repository;

import java.util.List;

@Repository
public interface UrlInfoRepository {

    UrlInfo selectByPrimaryKey(Integer id);

    UrlInfo selectByUrl(String url);

    int insert(UrlInfo urlInfo);

    List<UrlInfo> queryAll();
}

UrlInfoMapper.xml:

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="myflink.repository.UrlInfoRepository">
    <resultMap id="BaseResultMap" type="myflink.model.UrlInfo">
        <id column="id" property="id" jdbcType="INTEGER"/>
        <result column="url" property="url" jdbcType="VARCHAR"/>
        <result column="hash" property="hash" jdbcType="VARCHAR"/>
    </resultMap>

    <sql id="Base_Column_List">
        `id`,
        `url`,
        `hash`
    </sql>

    <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.Integer">
        select
        <include refid="Base_Column_List"/>
        from url_info
        where id = #{id,jdbcType=INTEGER}
    </select>

    <select id="selectByUrl" resultMap="BaseResultMap" parameterType="java.lang.String">
        select
        <include refid="Base_Column_List"/>
        from url_info
        where url = #{url,jdbcType=VARCHAR}
    </select>

    <select id="queryAll" resultMap="BaseResultMap">
        select
        <include refid="Base_Column_List"/>
        from url_info
    </select>

    <insert id="insert" parameterType="myflink.model.UrlInfo">
        insert into url_info
        <trim prefix="(" suffix=")" suffixOverrides=",">
            <if test="url != null">
                `url`,
            </if>
            <if test="hash != null">
                `hash`
            </if>
        </trim>
        <trim prefix="values (" suffix=")" suffixOverrides=",">
            <if test="url != null">
                #{url,jdbcType=VARCHAR},
            </if>
            <if test="hash != null">
                #{hash,jdbcType=VARCHAR}
            </if>
        </trim>
    </insert>
</mapper>

UrlInfoManager:

package myflink.manager;

import myflink.model.UrlInfo;

import java.util.List;

public interface UrlInfoManager {

    int insert(UrlInfo urlInfo);

    List<UrlInfo> queryAll();
}

UrlInfoManagerImpl:

package myflink.manager.impl;

import myflink.manager.UrlInfoManager;
import myflink.model.UrlInfo;
import myflink.repository.UrlInfoRepository;
import org.apache.commons.codec.digest.DigestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.util.List;

@Transactional
@Component("urlInfoManager")
public class UrlInfoManagerImpl implements UrlInfoManager {

    @Autowired
    private UrlInfoRepository urlInfoRepository;

    @Override
    public int insert(UrlInfo urlInfo) {

        urlInfo.setHash(DigestUtils.md5Hex(urlInfo.getUrl()));

        UrlInfo info = urlInfoRepository.selectByUrl(urlInfo.getUrl());
        if(null != info)
        {
            return 0;
        }

        return urlInfoRepository.insert(urlInfo);
    }

    @Override
    public List<UrlInfo> queryAll() {
        return urlInfoRepository.queryAll();
    }
}

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343

推荐阅读更多精彩内容

  • 1.明天早上6:40起来滚狼牙棒,为家人做早饭,虽然为老公和孩子出门满有一些情绪,但很快就恢复了理智,心平气和地想...
    Minna明红阅读 161评论 0 0
  • 背景 在团队协作开发的过程中,有的同事用windows,有的用mac;我本人用的mac。在多人协作开发H5项目(i...
    TroyZhang阅读 1,690评论 0 1
  • 为何进来各种鲜花平台这么火爆?! 有单纯的鲜花预定平台花点时间、Flowerplus等;还有生鲜平台也在做鲜花配送...
    枫荭阅读 485评论 0 1
  • 七弦天音透无声, 夕落红尘幻痴嗔。 快酒一杯君莫醉, 乐往南楼吟西风。
    琉離阅读 130评论 0 0