简介
项目简介
此项目是实现仿大数据项目流程,包括,日志收集传输,日志格式化,数据实时分析,数据持久化到HDFS,数据离线报表统计,离线任务调度,日志记录搜索几大部分
首先说一下这个项目的大致流程,这个项目是仿天猫数据分析,是自己模拟用户购买浏览商品,生成日志,对日志进行收集,然后分两部分,一部分对数据实时分析,生成日用户活跃地理位置,第二部分是数据持久化之后,T+1对数据进行分析,统计各省销量以及各省活跃用户数
项目数据流
-
日志数据生成
日志格式: requestid, ts, userid, 城市,经度,纬度,操作(浏览,购买)
flume采集日志数据进入kafka log topic
-
kafkastream消费log topic日志,写入process topic
对数据进行格式化处理,以及过滤数据。格式化后的数据格式: requestid, ts, userid, 城市,经度,纬度,操作
-
实时模块
sparkstreaming处理process topic日志,扔进realtime topic
格式化数据,写到process topic中,得到城市和用户id -
flume采集process topic数据,写入hdfs
持久化日志到hdfs中
-
report模块(离线处理模块)
T+1离线处理模块,spark计算hdfs中的数据,统计前一天的各省销售记录,写入mysql
azkaban调度任务
ElasticSearch查询历史记录
web页面实时展示活跃用户,和报表页面
然而。。电脑垃圾,扛不住,只做了一部分
azkaban,elasticsearch,web页面没有做
项目构成
- logbuilder
模拟日志生成(后面为了方便,写了一个shell用于日志生成)
-
kafkastream
kafkastream清洗日志
-
sparkstream
sparkstream实时处理日志显示操作用户的地理位置
-
report
T+1报表项目,批处理日志,分析各省销售量对比,写入mysql
实现
我们按照数据流的方式来写实现
1. 日志生成
使用SpringBoot做了一个日志生成器。模拟生成日志,日志格式
logger:>>>> requestid,userid,ts,城市,经度,维度,操作(0浏览 1购买)
public static void main(String[] args) {
SpringApplication.run(LogBuilderApplication.class);
HashMap map = new HashMap<Integer, String[]>();
// 存储城市,经纬度
map.put(0,new String[]{"海门","121.15","31.89"});
map.put(1,new String[]{"盐城","120.13","33.38"});
map.put(2,new String[]{"上海","121.48","31.22"});
map.put(3,new String[]{"厦门","118.1","24.46"});
Random random = new Random();
while (true){
int i1 = random.nextInt(map.size());
String[] o = (String[]) map.get(i1);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
long ts = System.currentTimeMillis();
String requestid = UUID.randomUUID().toString();
int userid = random.nextInt(100000);
// requestid,userid, 城市,经度,纬度,操作(浏览,购买)0 浏览 1 购买
logger.info("logger:>>>>{},{},{},{},{},{},{}",requestid, ts, userid, o[0],o[1],o[2],random.nextInt(2));
}
}
由于不想打包,重启,就自己用shell写了一个日志生成器,更好使用
#!/bin/bash
#二维数组
city=('海门 121.15 31.89' '盐城 120.13 33.38' '上海 121.48 31.22' '厦门 118.1 24.46')
#city=('海门 121.15" "31.89"' '"盐城" "120.13" "33.38"' '"上海" "121.48" "31.22"' '"厦门" "118.1" "24.46"')
#获取数组长度
echo ${#city[@]}
len=$((${#city[@]}-1))
echo $len
#死循环,随机数0-数组长度
while :
do
rand=`shuf -i0-${len} -n1`
echo $rand
echo ${city[${rand}]}
c=(${city[${rand}]})
ci=${c[0]}
cj=${c[1]}
cw=${c[2]}
req=$(cat /proc/sys/kernel/random/uuid)
date=`date -d 'day' +%Y%m%d`
ts=`date -d $date +%s`
userid=`shuf -i1-100000 -n1`
action=`shuf -i0-1 -n1`
echo "logger:>>>>$req,$ts,$userid,${ci},${cj},${cw},${action}"
sleep 3
done
2. 日志采集
日志采集使用Flume,将日志文件数据采集到Kafka中,
# 监控本地文件写到kafka
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#设置source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/test_flume_file/test_flume_file
#设置Sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = test
a1.sinks.k1.kafka.bootstrap.servers = 192.168.33.4:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy
#设置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#连接
a1.sources.r1.channels = c1
#注意,是channel
a1.sinks.k1.channel = c1
3. KafkaStream进行数据清洗
这一步主要是对日志数据进行清洗,过滤掉不符合规范的日志,
过滤后的日志
requestid,userid,ts,城市,经度,维度,操作(0浏览 1购买)
public static void main(String[] args) {
String from = "log";
String to = "process";
Properties properties = new Properties();
// 设置application id
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG,"stream-tmall");
// 设置kafka地址
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.33.3:9092,192.168.33.4:9092,192.168.33.5:9092");
// 创建拓扑结构
Topology topology = new Topology();
// 构建数据来源,数据处理逻辑,数据去向
topology.addSource("SOURCE",from)
.addProcessor("PROCESS", ()-> new LogProcesser(), "SOURCE")
.addSink("SINK", to, "PROCESS");
//过时
// TopologyBuilder builder = new TopologyBuilder();
// builder.addSource("SOURCE",from)
// .addProcessor("PROCESS", ()->new LogProcesser(), "SOURCE")
// .addSink("SINK",to);
//
//
// // 创建kafkastream
KafkaStreams streams = new KafkaStreams(topology,properties);
// 开启流处理
streams.start();
System.out.println("kafkaStream is start!!!");
}
public class LogProcesser implements Processor<byte[],byte[]> {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(byte[] key, byte[] value) {
// 核心流程,处理日志
String line = new String(value);
if (line.contains("logger:>>>>")){
System.out.println("LogProcess process data:" + line);
String[] split = line.split("logger:>>>>");
// 转发
context.forward("LogProcess".getBytes(), split[1].trim().getBytes());
}
context.commit();
}
@Override
public void punctuate(long timestamp) {
}
@Override
public void close() {
}
}
将Kafka log topic中的数据消费,将日志数据进行格式化,写到Process topic中
4. SparkStreaming数据实时处理
我们实时部分,是消费Kafka中process topic中的数据,每一条数据都是今天用户的操作,所以我们将每一条日志的城市取出来,放到realtime topic中,等待后序消费,把数据推送到前端进行实时展示(这部分没做,页面不好写)
public class SparkStreamingProcesser {
private static final String brokers = "192.168.33.3:9092,192.168.33.4:9092,192.168.33.5:9092";
private static final String group_id = "tmall_online";
private static final List<String> topic = new ArrayList<String>(Arrays.asList("process"));
private static final String toTopic = "realtime";
public static void main(String[] args) {
//1. 得到spark上下文
SparkConf conf = new SparkConf().setAppName("tmall_online").setMaster("local[*]");
//2. 创建sparkstreamingcontext。每隔2钟会处理一次收集到的数据
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
//3. 创建kafka的参数
HashMap<String, Object> kafkaParams = new HashMap<String, Object>();
// 设置kafka集群地址
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
// 设置消费者组
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, group_id);
// 设置key反序列化类
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 4. 通过参数创建一个kafka stream
JavaInputDStream<ConsumerRecord<Object, Object>> stream = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topic, kafkaParams));
// 5.获取数据并处理
JavaDStream<String> map = stream.map(msg-> msg.value().toString());
JavaDStream<Object> map1 = map.map(x -> {
KafkaProducerUtils producerUtils = new KafkaProducerUtils();
String[] split = x.split(",");
System.out.println("接收到:" + split[1]+","+split[2]);
producerUtils.sendMessage(toTopic, split[3]+","+split[4]+","+split[5]);
return "a";
});
map1.print();
jssc.start();
try {
jssc.awaitTermination();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
jssc.close();
}
}
}
5. 日志持久化
我们需要将用户操作日志持久化到HDFS中,我们将格式化之后的数据采集到HDFS,所以我们使用Flume将Kafka中process topic的数据采集到HDFS
# 监控kafka写到hdfs
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#设置source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 500
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = 192.168.33.3:9092,192.168.33.4:9092,192.168.33.5:9092
a1.sources.r1.kafka.topics = process
a1.sources.r1.kafka.consumer.group.id = c_flume
#设置Sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/customfile/tmall/dt=%Y%m%d
a1.sinks.k1.hdfs.filePrefix = hadoop1_%H_events_
a1.sinks.k1.hdfs.fileSuffix=.log
a1.sinks.k1.hdfs.rollSize = 102400000
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.idleTimeout = 5400
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#设置文件相关 文件类型为纯文本
a1.sinks.k1.hdfs.fileType = DataStream
#设置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000
#连接
a1.sources.r1.channels = c1
#注意,是channel
a1.sinks.k1.channel = c1
6. 报表项目
当每天数据采集到HDFS之后,我们需要T+1处理这些日志,产生报表,我们这里主要产生两个指标
- 各省活跃用户
- 各省销售量
这里我们使用Scala编写Spark程序,做报表,并没有写到mysql,将csv数据写到hdfs中,后序可以导入mysql,电脑带不起了
object ReportStatistics {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("report_statistics")
.enableHiveSupport()
.getOrCreate()
val date = args(0)
//1. 读取前一天数据
import spark.implicits._
val df = spark.sparkContext.textFile(s"/user/custom/tmall/dt=$date")
.filter(x=>{
val strings: Array[String] = x.split(",")
strings.length == 7
})
.map(x=>{
val strings: Array[String] = x.split(",")
(strings(0),strings(1),strings(2),strings(3),strings(6))
}).toDF("requestid", "ts", "userid", "provice", "action")
df.cache().createOrReplaceTempView("t1")
// 统计各省用户日活
spark.sql(
s"""
|select count(distinct(userid)) as users,provice from t1 group by provice
|union all
|select count(distinct(userid)) as users,"all" as provice
""".stripMargin).repartition(1).write
.mode(SaveMode.Overwrite).csv(s"/user/data/reporting/tmall/dt=$date/users")
// 统计各省销售量
spark.sql(
s"""
|select count(requestid) as cnt,provice from t1 where action=0 group by provice
|union all
|select count(requestid) as cnt from t1 where action=0
""".stripMargin).repartition(1).write
.mode(SaveMode.Overwrite).csv(s"/user/data/reporting/tmall/dt=$date/sales")
}
}
总结
项目现在就是做到现在这样,后面还有Azkaban任务调度,Flume将数据采集到ElasticSearch中,用于试试查询,还有web显示页面。
显示的话,报表基本做完,一个web项目读取mysql展示就行,如果有想完善的,可以完善一下,
项目中遇到的问题
- Flume将数据采集到HDFS的时候产生了大量的小文件,后面调整了配置参数
源码看这里: https://gitee.com/zhangqiye/tmall
如有问题,可以加群: 552113611