引言
场景
数仓Hive中的数据需要读取后写入Kafka中进行数据服务输出。
选型
选用Flink进行读Hive写Kafka,因为其拥有丰富的connector可选择。
开发
pom依赖
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.13.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.11</artifactId>
<version>2.2.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
<configuration>
<skipSource>true</skipSource>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>3.2.0</version>
<configuration>
<archive>
<!--确定主类-->
<manifest>
<mainClass>com.test.demo.flinkhive2kafka.job.Hive2Kafka</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
job类
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.catalog.hive.HiveCatalog;
import lombok.extern.slf4j.SLf4j;
@Slf4j
public class Hive2Kafka {
public static void main(String[] args) {
// 设置flink sql环境
EnvironmentSettings environmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
// 创建table环境
TableEnvironment tableEnvironment = TableEnvironment.create(environmentSettings);
// 设置配置
tableEnvironment.getConfig().getConfiguration().setString("table.exec.hive.fallback-mapred-reader", "true")
// 获取外部配置
ParameterTool parameterTool = ParameterTool.fromArgs(args);
log.info("parameters size: {}", parameterTool.getNumberOfParameters());
// 获取所有配置
String hiveCatalogName = parameterTool.get("hive.catalog.name");
String hiveConfDir = parameterTool.get("hive.conf.dir");
String hiveDatabaseName = parameterTool.get("hive.db.name");
String hiveKafakaTable = parameterTool.get("hive.kafka.tb");
String kafkaBootstrapServer = parameterTool.get("kafka.bootstrap.server");
String kafkaTopic = parameterTool.get("kafka.topic");
String kafkaGroupId = parameterTool.get("kafka.group.id");
String kafkaUsername = parameterTool.get("kafka.username");
String kafkaPassword = parameterTool.get("kafka.password");
String insertKafkaTableSql = parameterTool.get("insert.kafka.table.sql");
// 创建hive catalog
HiveCatalog hiveCatalog = new HiveCatalog(hiveCatalogName, hiveDatabaseName, hiveConfDir);
// 注册catalog
tableEnvironment.registerCatalog(hiveCatalogName, hiveCatalog);
// 使用catalog
tableEnvironment.useCatalog(hiveCatalogName);
String createKafkaTableSql = String.format("CREATE TABLE IF NOT EXISTS %s(`field01` STRING) \n" +
"WITH('connector' = 'kafka', \n" +
"'topic' = '%s', \n" +
"'properties.group.id' = '%s', \n" +
"'properties.bootstrap.servers' = '%s', \n" +
"'scan.startup.mode' = 'group-offsets', \n" +
"'properties.auto.offset.reset' = 'earliest', \n" +
"'format' = 'raw', \n" +
"'properties.security.protocol' = 'SASL_PLAINTEXT', \n" +
"'properties.sasl.mechanism' = 'PLAIN', \n" +
"'properties.sasl.mechanism' = 'org.apache.kafka.common.security.plain.PlainLoginModule " +
"required username = \"%s\" password=\"%s\";'\n" +
")",hiveKafkaTable, kafkaTopic, kafkaGroupId, kafkaBootstrapServer, kafkaUsername, kafkaPassword);
// 创建kafka表
tableEnvironment.executeSql(createKafkaTableSql).print();
// 执行flink sql
tableEnvironment.executeSql(insertKafkaTableSql).print();
}
}
执行
使用yarn-application
模式
./fkink run-application -t yarn-application flink-hive-2-kafka-1.0.jar --hive.db.name xxx --hive.kafka.tb xxx --kafka.bootstrap.server xxx:9092,xxx:9092 --kafka.topic xxx --kafka.group.id xxx --kafka.username xxx --kafka.password 'xxx' --sql.insert.kafka.table 'xxxxxxx'