1. 启动并登录NiFi
1.1 在本机启动nifi
./bin/nifi.sh start
1.2 登录nifi
打开浏览器,访问xxx.xxx.xxx.xxx:8080(默认端口是8080)
2. 构建Processor
2.1 ExecuteSQL
拖一个Processor到面板中,选择ExecuteSQL类型
在processor上点击右键-->选择configure-->选择properties
选择Database Connection Pooling Service属性值为:DBCPConnectionPool,然后再点击右侧的箭头,进入项目的Configuration界面,配置数据库连接的具体信息。
在项目的Configuration页面中,选择新生成的DBCPConnectionPool条目,点击右侧的齿轮按钮,进行相关配置。
依次填写:
-
Database Connection URL:
数据库连接字符串,如:
jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=true -
Database Driver Class Name:
数据库连接驱动类,这里是:com.mysql.jdbc.Driver
-
Database Driver Location(s):
数据库连接驱动jar包的位置,如:/home/hadoop/nifi/mysql-connector-java-5.1.39-bin.jar
-
Database User:
数据库用户名,如:root
-
Password:
数据库密码,如:root
回到ExecuteSQL的properties,设置SQL select query为想要查询的SQL语句,设置Max Wait Time为10 seconds。
2.2 ConvertAvroToJson
从ExecuteSQL里出来的是avro格式的数据,要先将其转化成json格式,再导入HBase。拖一个ConvertAvroToJson Processor到界面。然后,从ExecuteSQL连一条线到ConvertAvroToJson,关系为success。
2.3 SplitJson
从上一步输出的数据是由多条记录构成的整体,需要将其分割成独立的单条数据,再依次导入HBase。
拖入一个SplitJson processor到界面中,然后从ConvertAvroToJson连一条线到SplitJson,关系为success。
配置SplitJson,在properties页,将JsonPath Expression设置为*
2.4 PutHbaseJson
这一步将分割好的json格式的Mysql记录添加到HBase中。
拖入一个PutHbaseJson processor到界面中,右键点击,选择configure,进行配置。
在properties页,配置HBase Client Service为 HBase_1_1_2_ClientService
点击右侧的小箭头,增加HbaseClientService,然后点击齿轮按钮,在properites中依次设置:
-
ZooKeeper Quorum :
Zookeeper的地址,如:192.168.2.xxx
-
Zookeeper client port:
Zookeeper的端口,一般是:2181
-
Zookeeper znode parent:
Zookeeper中的znode,一般是:/hbase
配置完成后,点击小闪电,让配置生效
回到PutHbaseJson的properties,配置RowIdentifier。依次设置:
-
Table Name:
数据导入到HBase的表名称,需要是已存在的表
-
Row Identifier Field Name:
作为HBase表行键的字段名称,一般设置为mysql中的主键
-
Row Identified Encoding Strategy:
行键编码方式,根据行键类型选择String或Binary
-
Column Famlily:
数据导入到HBase表的列族名称
从SplitJosn连一条线到PutHbaseJson,关系为split
2.5 LogAttribute
拖入一个LogAttribute到界面中,从其他所有processor连一条线到LogAttribute,关系全部选则为failure,其中splitJson的original关系也连接到LogAttribute。
接下来定义数据流的终止方式:
(1)点击LogAttribute的confiure,在setting标签中,勾选右侧Automatically Terminate Relationships的success选项。
(2)点击PutHbaseJson的confiure,在setting标签中,勾选右侧Automatically Terminate Relationships的success选项。
3. 检查并开始导入
完成以上步骤后,整体结构图类似于下图所示:
点击空白处,再点击左侧Operate面板的齿轮按钮,检查数据库连接
确保mysql与hbase的连接都处于enable状态,如果没有,则可以查看左侧的错误提示,根据提示修改错误。
最后,根据需要设置ExecuteSQL processor的scheduling选项,默认的执行间隔是0秒,即不间断的执行SQL语句,会导致从Mysql中读出大量重复数据。如果仅仅需要将一次SQL查询的结果导入HBase,建议将该值设置大一些,等待执行完毕后手动结束即可;如果需要定期执行,则应设置合适的执行间隔时间。
以上检查完成后,在Operate面板中点击run按钮,开始进行从mysql到hbase的转换,转换完成后,点击stop按钮停止转换。也可单独控制每一个Processor的启动与停止状态,以便调试。