1.blink在flink的基础上做了大量的优化,其中有两点:
1.1Catalog
在catalog上做了如下修改和优化:
- 通过引入全新的 ReadableCatalog and ReadableWritableCatalog 接口统一了 Flink 的内部和外部 catalog。Flink 所有的 catalog 会被 TableEnvironment 中的 CatalogManager管理。
- 实现了两种新的 catalog - FlinkInMemoryCatalog and HiveCatalog。FlinkInMemoryCatalog 会将所有元数据存在内存中。HiveCatalog 会连接 Hive metastore 并桥接 Flink 和 Hive 之间的元数据。目前,这个HiveCatalog 可以提供读取 Hive 元数据的能力,包括数据库(databases),表(tables),表分区(table partitions), 简单的数据类型(simple data types), 表和列的统计信息(table and column stats)。
- 重新清晰定义了引用目标的层级,即 'mycatalog.mydatabase.mytable'。通过定义默认 catalog 和默认数据库,用户可以将引用层级简单化为 'mytable’。
未来,我们还将加入对更多类型的元数据以及catalog的支持。
1.2Hive兼容性
我们的目标是在元数据(meta data)和数据层将 Flink 和 Hive 对接和打通。
- 在这个版本上,Flink可以通过上面提到的HiveCatalog读取Hive的metaData。
- 这个版本实现了HiveTableSource,使得Flink job可以直接读取Hive中普通表和分区表的数据,以及做分区的裁剪。
通过这个版本,用户可以使用Flink SQL读取已有的Hive meta和data,做数据处理。未来我们将在Flink上继续加大对Hive兼容性的支持,包括支持Hive特有的data type,和Hive UDF等等。
2.如何连接hive 源数据
2.1 代码
通过flink-sql连接外部数据源(比如hive),需要写一些代码声明。
2.2 blink sql-client
3. 环境准备
3.1安装hadoop
参考https://blog.csdn.net/hubin232/article/details/76769265
cd /usr/local/Cellar/hadoop/3.1.1/libexec/sbin
./start-all.sh //即可启动 hadoop namenode,secondnamenode,datanode,resource mananger组件
3.2 安装hive
mac 环境下brew install hive 即可安装最新版 (需要先装mysql或者一个能连的上mysql也行)
3.3 配置
cd /usr/local/Cellar/hive/3.1.1/libexec/conf
cp hive-default.xml.template hive-site.xml
编辑hive-site.xml及后续参考https://www.jianshu.com/p/5c11073d19d3
安装后,建表,插数据。
3.4.metastore server开启
注意!!! 一定要确保hive开启了 metastore server
lsof -i:9083 查询是否开启了。
开启方式有两种
1
/usr/local/Cellar/hive/3.1.1/libexec/hcatalog/sbin/hcat_server.sh start
提示
Started metastore server init, testing if initialized correctly...
Metastore initialized successfully on port[9083].
就说明成功了。
2 hive --service metastore (这个没试过)
lsof -i:9083
3.5 为什么要开启metastore 呢?
blink catalog架构图
红框内就是连接的hive metastore,所以需要先开启 hive 的metastore server。
4. blink源码修改
1. sql-client Environment 类
修改位置:
org.apache.flink.table.client.config.Environment
enrich 方法
enrichedEnv.deployment = DeploymentEntry.enrich(env.deployment, properties);
下方加入
enrichedEnv.catalogs = new HashMap<>(env.catalogs);
这块没有将catalogs复制过去,会导致从环境中读取到的catalogs丢失,用户永远没发定义catalog。
2.hive connector
官方支持的hive版本是2.4,我的是3.1.1,会报错
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.BatchTableSourceFactory' in
the classpath.
Reason:
The matching factory 'org.apache.flink.streaming.connectors.hive.HiveTableFactory' doesn't support 'bucketing_version'.
所以要在 flink-connector-hive模块
在类org.apache.flink.table.catalog.hive.config.HiveTableConfig
加入:
public static final String DEFAULT_TABLE_BUCKETING_VERSION = "bucketing_version";
public static final String DEFAULT_TABLE_COLUMN_STATS_ACCURATE = "column_stats_accurate";
在类org.apache.flink.streaming.connectors.hive.HiveTableFactory
supportedProperties 方法加入:
properties.add(HiveTableConfig.DEFAULT_TABLE_BUCKETING_VERSION);
properties.add(HiveTableConfig.DEFAULT_TABLE_COLUMN_STATS_ACCURATE);
修改pom
否则会报找不到HadoopInputFormatCommonBase 这个类。
3 jar包替换。
代码修改后
在sql-client模块mvn clean install -Dmaven.test.skip -Dcheckstyle.skip -Drat.ignoreErrors=true
将flink-sql-client-1.5.1.jar包拷到 /apache-flink/build-target/opt/sql-client
在flink-connector-hive模块 mvn clean install -Dmaven.test.skip -Dcheckstyle.skip -Drat.ignoreErrors=true
将flink-connector-hive_2.11-1.5.1.jar拷到/apache-flink/build-target/opt/connectors
5.应用
5.1配置
进入/apache-flink/build-target/bin
cp ../conf/sql-client-default.ymal sql-client-hive.ymal
修改sql-client-hive.ymal
execution配置:
(streaming模式不支持,应该可以通过修改flink-connector-hive模块代码支持。)
catalogs配置:
5.2.执行./sql-client.sh embedded -e sql-client-hive.yaml
至此就打通blink的 sql-client与 hive源数据了。