大数据项目之仿天猫数据分析

简介

项目简介

此项目是实现仿大数据项目流程,包括,日志收集传输,日志格式化,数据实时分析,数据持久化到HDFS,数据离线报表统计,离线任务调度,日志记录搜索几大部分

首先说一下这个项目的大致流程,这个项目是仿天猫数据分析,是自己模拟用户购买浏览商品,生成日志,对日志进行收集,然后分两部分,一部分对数据实时分析,生成日用户活跃地理位置,第二部分是数据持久化之后,T+1对数据进行分析,统计各省销量以及各省活跃用户数

项目数据流

  1. 日志数据生成

    日志格式: requestid, ts, userid, 城市,经度,纬度,操作(浏览,购买)

  2. flume采集日志数据进入kafka log topic

  3. kafkastream消费log topic日志,写入process topic

    对数据进行格式化处理,以及过滤数据。格式化后的数据格式: requestid, ts, userid, 城市,经度,纬度,操作

  4. 实时模块

    sparkstreaming处理process topic日志,扔进realtime topic
    格式化数据,写到process topic中,得到城市和用户id

  5. flume采集process topic数据,写入hdfs

    持久化日志到hdfs中

  6. report模块(离线处理模块)

    T+1离线处理模块,spark计算hdfs中的数据,统计前一天的各省销售记录,写入mysql

  7. azkaban调度任务

  8. ElasticSearch查询历史记录

  9. web页面实时展示活跃用户,和报表页面

然而。。电脑垃圾,扛不住,只做了一部分

azkaban,elasticsearch,web页面没有做

项目构成

  1. logbuilder

模拟日志生成(后面为了方便,写了一个shell用于日志生成)

  1. kafkastream

    kafkastream清洗日志

  2. sparkstream

    sparkstream实时处理日志显示操作用户的地理位置

  3. report

    T+1报表项目,批处理日志,分析各省销售量对比,写入mysql

实现

我们按照数据流的方式来写实现

1. 日志生成

使用SpringBoot做了一个日志生成器。模拟生成日志,日志格式

logger:>>>> requestid,userid,ts,城市,经度,维度,操作(0浏览 1购买)
public static void main(String[] args) {

        SpringApplication.run(LogBuilderApplication.class);

        HashMap map = new HashMap<Integer, String[]>();

        // 存储城市,经纬度
        map.put(0,new String[]{"海门","121.15","31.89"});
        map.put(1,new String[]{"盐城","120.13","33.38"});
        map.put(2,new String[]{"上海","121.48","31.22"});
        map.put(3,new String[]{"厦门","118.1","24.46"});
        Random random = new Random();

        while (true){
            int i1 = random.nextInt(map.size());
            String[] o = (String[]) map.get(i1);
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            long ts = System.currentTimeMillis();
            String requestid = UUID.randomUUID().toString();

            int userid = random.nextInt(100000);
            // requestid,userid, 城市,经度,纬度,操作(浏览,购买)0 浏览 1 购买
            logger.info("logger:>>>>{},{},{},{},{},{},{}",requestid, ts, userid, o[0],o[1],o[2],random.nextInt(2));
        }
    }

由于不想打包,重启,就自己用shell写了一个日志生成器,更好使用

#!/bin/bash
#二维数组
city=('海门 121.15 31.89' '盐城 120.13 33.38' '上海 121.48 31.22' '厦门 118.1 24.46')
#city=('海门 121.15" "31.89"' '"盐城" "120.13" "33.38"' '"上海" "121.48" "31.22"' '"厦门" "118.1" "24.46"')
#获取数组长度
echo ${#city[@]}
len=$((${#city[@]}-1))
echo $len
#死循环,随机数0-数组长度
while :
do
    rand=`shuf -i0-${len} -n1`
    echo $rand
    echo ${city[${rand}]}
 c=(${city[${rand}]})
ci=${c[0]}
cj=${c[1]}
cw=${c[2]}
  req=$(cat /proc/sys/kernel/random/uuid)
date=`date -d 'day' +%Y%m%d`
ts=`date -d $date +%s`
userid=`shuf -i1-100000 -n1`
action=`shuf -i0-1 -n1`
    echo "logger:>>>>$req,$ts,$userid,${ci},${cj},${cw},${action}"
    sleep 3
done

2. 日志采集

日志采集使用Flume,将日志文件数据采集到Kafka中,

# 监控本地文件写到kafka
a1.sources = r1
a1.channels = c1
a1.sinks = k1

#设置source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/test_flume_file/test_flume_file

#设置Sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = test
a1.sinks.k1.kafka.bootstrap.servers = 192.168.33.4:9092
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy

#设置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#连接
a1.sources.r1.channels = c1
#注意,是channel
a1.sinks.k1.channel = c1

3. KafkaStream进行数据清洗

这一步主要是对日志数据进行清洗,过滤掉不符合规范的日志,

过滤后的日志

requestid,userid,ts,城市,经度,维度,操作(0浏览 1购买)
 public static void main(String[] args) {

        String from = "log";
        String to = "process";
        Properties properties = new Properties();

        // 设置application id
        properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG,"stream-tmall");
        // 设置kafka地址
        properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.33.3:9092,192.168.33.4:9092,192.168.33.5:9092");

        // 创建拓扑结构
        Topology topology = new Topology();
        // 构建数据来源,数据处理逻辑,数据去向
        topology.addSource("SOURCE",from)
                .addProcessor("PROCESS", ()-> new LogProcesser(), "SOURCE")
                .addSink("SINK", to, "PROCESS");
        //过时
//        TopologyBuilder builder = new TopologyBuilder();
//        builder.addSource("SOURCE",from)
//                .addProcessor("PROCESS", ()->new LogProcesser(), "SOURCE")
//                .addSink("SINK",to);
//
//
//        // 创建kafkastream
        KafkaStreams streams = new KafkaStreams(topology,properties);
        // 开启流处理
        streams.start();
        System.out.println("kafkaStream is start!!!");


    }
    
    
public class LogProcesser implements Processor<byte[],byte[]> {

    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {

        this.context = context;
    }

    @Override
    public void process(byte[] key, byte[] value) {

        // 核心流程,处理日志
        String line = new String(value);
        if (line.contains("logger:>>>>")){
            System.out.println("LogProcess process data:" + line);
            String[] split = line.split("logger:>>>>");
            // 转发
            context.forward("LogProcess".getBytes(), split[1].trim().getBytes());
        }
        context.commit();
    }

    @Override
    public void punctuate(long timestamp) {

    }

    @Override
    public void close() {

    }
}

将Kafka log topic中的数据消费,将日志数据进行格式化,写到Process topic中

4. SparkStreaming数据实时处理

我们实时部分,是消费Kafka中process topic中的数据,每一条数据都是今天用户的操作,所以我们将每一条日志的城市取出来,放到realtime topic中,等待后序消费,把数据推送到前端进行实时展示(这部分没做,页面不好写)

public class SparkStreamingProcesser {

    private static final String brokers = "192.168.33.3:9092,192.168.33.4:9092,192.168.33.5:9092";
    private static final String group_id = "tmall_online";
    private static final List<String> topic = new ArrayList<String>(Arrays.asList("process"));

    private static final String toTopic = "realtime";


    public static void main(String[] args) {
        //1. 得到spark上下文
        SparkConf conf = new SparkConf().setAppName("tmall_online").setMaster("local[*]");

        //2. 创建sparkstreamingcontext。每隔2钟会处理一次收集到的数据
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));

        //3. 创建kafka的参数
        HashMap<String, Object> kafkaParams = new HashMap<String, Object>();

        // 设置kafka集群地址
        kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        // 设置消费者组
        kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, group_id);
        // 设置key反序列化类
        kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);


        // 4. 通过参数创建一个kafka stream
        JavaInputDStream<ConsumerRecord<Object, Object>> stream = KafkaUtils.createDirectStream(
                jssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Subscribe(topic, kafkaParams));

        // 5.获取数据并处理
        JavaDStream<String> map = stream.map(msg-> msg.value().toString());


        JavaDStream<Object> map1 = map.map(x -> {
            KafkaProducerUtils producerUtils = new KafkaProducerUtils();
            String[] split = x.split(",");
            System.out.println("接收到:" + split[1]+","+split[2]);
            producerUtils.sendMessage(toTopic, split[3]+","+split[4]+","+split[5]);
            return "a";
        });
        map1.print();


        jssc.start();

        try {
            jssc.awaitTermination();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            jssc.close();
        }

    }


}

5. 日志持久化

我们需要将用户操作日志持久化到HDFS中,我们将格式化之后的数据采集到HDFS,所以我们使用Flume将Kafka中process topic的数据采集到HDFS

# 监控kafka写到hdfs
a1.sources = r1
a1.channels = c1
a1.sinks = k1

#设置source

a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 500
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = 192.168.33.3:9092,192.168.33.4:9092,192.168.33.5:9092
a1.sources.r1.kafka.topics = process
a1.sources.r1.kafka.consumer.group.id = c_flume

#设置Sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /user/customfile/tmall/dt=%Y%m%d
a1.sinks.k1.hdfs.filePrefix = hadoop1_%H_events_
a1.sinks.k1.hdfs.fileSuffix=.log
a1.sinks.k1.hdfs.rollSize = 102400000
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.idleTimeout = 5400
#是否使用本地时间戳
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#设置文件相关 文件类型为纯文本
a1.sinks.k1.hdfs.fileType = DataStream
#设置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 1000

#连接
a1.sources.r1.channels = c1
#注意,是channel
a1.sinks.k1.channel = c1

6. 报表项目

当每天数据采集到HDFS之后,我们需要T+1处理这些日志,产生报表,我们这里主要产生两个指标

  1. 各省活跃用户
  2. 各省销售量

这里我们使用Scala编写Spark程序,做报表,并没有写到mysql,将csv数据写到hdfs中,后序可以导入mysql,电脑带不起了

object ReportStatistics {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .appName("report_statistics")
      .enableHiveSupport()
      .getOrCreate()


    val date = args(0)
    //1. 读取前一天数据

    import spark.implicits._

    val df = spark.sparkContext.textFile(s"/user/custom/tmall/dt=$date")
      .filter(x=>{
        val strings: Array[String] = x.split(",")
        strings.length == 7
      })
      .map(x=>{
        val strings: Array[String] = x.split(",")
        (strings(0),strings(1),strings(2),strings(3),strings(6))
      }).toDF("requestid", "ts", "userid", "provice", "action")


    df.cache().createOrReplaceTempView("t1")
    // 统计各省用户日活
    spark.sql(
      s"""
         |select count(distinct(userid)) as users,provice from t1 group by provice
         |union all
         |select count(distinct(userid)) as users,"all" as provice
       """.stripMargin).repartition(1).write
      .mode(SaveMode.Overwrite).csv(s"/user/data/reporting/tmall/dt=$date/users")

    // 统计各省销售量

    spark.sql(
      s"""
         |select count(requestid) as cnt,provice from t1 where action=0 group by provice
         |union all
         |select count(requestid) as cnt from t1 where action=0
       """.stripMargin).repartition(1).write
      .mode(SaveMode.Overwrite).csv(s"/user/data/reporting/tmall/dt=$date/sales")
  }

}

总结

项目现在就是做到现在这样,后面还有Azkaban任务调度,Flume将数据采集到ElasticSearch中,用于试试查询,还有web显示页面。

显示的话,报表基本做完,一个web项目读取mysql展示就行,如果有想完善的,可以完善一下,

项目中遇到的问题

  1. Flume将数据采集到HDFS的时候产生了大量的小文件,后面调整了配置参数

源码看这里: https://gitee.com/zhangqiye/tmall

如有问题,可以加群: 552113611

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343

推荐阅读更多精彩内容