场景:flume接收kafka JSON数据写入hive表
遇到问题:找不到RecordWriter类
java.lang.NoClassDefFoundError: org/apache/hive/hcatalog/streaming/RecordWriter
解决:将hive-hcatalog-streaming-3.1.2.jar
拷贝到hive的lib目录
cp /opt/module/hive/hcatalog/share/hcatalog/hive-hcatalog-streaming-3.1.2.jar /opt/module/hive/lib/
创建hive表
先开启hive表的事务
set hive.support.concurrency=true;
set hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
创建hive表,注意:表字段需要与kafka发送的JSON字段对应上
create table boxconfigure(
dev_id string,
area_id string,
factory_id string,
workshop_id string,
`time` string,
collection_name string)
clustered by (workshop_id) into 3 buckets
stored as orc tblproperties('transactional'='true');
编写kafka-hive.conf
a1.channels = c1
a1.sinks = k1
# kafka schannel配置
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = 127.0.0.1:9092
a1.channels.c1.kafka.topic = hotitems
a1.channels.c1.kafka.group.id = flume
#a1.channels.c1.kafka.auto.offset.reset=latest
a1.channels.c1.parseAsFlumeEvent = false
# sink配置
a1.sinks.k1.type = hive
a1.sinks.k1.channel = c1
a1.sinks.k1.hive.metastore = thrift://hadoop102:9083
a1.sinks.k1.hive.database = default
a1.sinks.k1.hive.table = boxconfigure
#a1.sinks.k1.hive.partition = asia
a1.sinks.k1.useLocalTimeStamp = false
a1.sinks.k1.round = true
a1.sinks.k1.roundValue = 10
a1.sinks.k1.roundUnit = minute
#a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer =JSON
a1.sinks.k1.serializer.delimiter = "\t"
a1.sinks.k1.serializer.serdeSeparator = '\t'
a1.sinks.k1.serializer.fieldnames =dev_id,area_id,factory_id,workshop_id,time,collection_name
启动flume
bin/flume-ng agent -c conf/ -f conf/kafka-hive.conf -n a1 -Dflume.root.logger=INFO,console
kafka 发送数据....
public class KafkaProducer {
public static void main(String[] args) throws Exception {
writeToKafka("hotitems");
}
public static void writeToKafka(String topic) throws Exception {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//定义一个producer
org.apache.kafka.clients.producer.KafkaProducer<String, String> kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer<String, String>(properties);
JSONArray jsonArray = new JSONArray();
// 造数据...
// jsonArray.add(json);
for (Object o : jsonArray) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, 0, "key", o.toString());
kafkaProducer.send(producerRecord);
}
System.out.println("数据发送完毕!");
kafkaProducer.close();
}