15-Flink实战项目之实时热销排行

戳更多文章:

1-Flink入门

2-本地环境搭建&构建第一个Flink应用

3-DataSet API

4-DataSteam API

5-集群部署

6-分布式缓存

7-重启策略

8-Flink中的窗口

9-Flink中的Time

Flink时间戳和水印

Broadcast广播变量

FlinkTable&SQL

Flink实战项目实时热销排行

Flink写入RedisSink

17-Flink消费Kafka写入Mysql

需求

某个图书网站,希望看到双十一秒杀期间实时的热销排行榜单。我们可以将“实时热门商品”翻译成程序员更好理解的需求:每隔5秒钟输出最近一小时内点击量最多的前 N 个商品/图书.

需求分解

将这个需求进行分解我们大概要做这么几件事情:

  • 告诉 Flink 框架基于时间做窗口,我们这里用processingTime,不用自带时间戳
  • 过滤出图书点击行为数据
  • 按一小时的窗口大小,每5秒钟统计一次,做滑动窗口聚合(Sliding Window)
  • 聚合,输出窗口中点击量前N名的商品

代码实现

向Kafka发消息模拟购买事件

public class KafkaProducer {


    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> text = env.addSource(new MyNoParalleSource()).setParallelism(1);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        //new FlinkKafkaProducer("topn",new KeyedSerializationSchemaWrapper(new SimpleStringSchema()),properties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer("topn",new SimpleStringSchema(),properties);
/*
        //event-timestamp事件的发生时间
        producer.setWriteTimestampToKafka(true);
*/
        text.addSink(producer);
        env.execute();
    }
}//

其中的:MyNoParalleSource 是作者自己实现的一个并行度为1的发送器,用来向kafka发送数据:

public class MyNoParalleSource implements SourceFunction<String> {//1

    //private long count = 1L;
    private boolean isRunning = true;

    /**
     * 主要的方法
     * 启动一个source
     * 大部分情况下,都需要在这个run方法中实现一个循环,这样就可以循环产生数据了
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while(isRunning){
            //图书的排行榜
            List<String> books = new ArrayList<>();
            books.add("Pyhton从入门到放弃");//10
            books.add("Java从入门到放弃");//8
            books.add("Php从入门到放弃");//5
            books.add("C++从入门到放弃");//3
            books.add("Scala从入门到放弃");//0-4
            int i = new Random().nextInt(5);
            ctx.collect(books.get(i));

            //每1秒产生一条数据
            Thread.sleep(1000);
        }
    }
    //取消一个cancel的时候会调用的方法
    @Override
    public void cancel() {
        isRunning = false;
    }
}

可见,我们每过1秒向Kafka的topn这个topic随机发送一本书的名字用来模拟购买行为。

整体实现代码如下:

public class TopN {

    public static void main(String[] args) throws Exception{

        /**
         *
         *  书1 书2 书3
         *  (书1,1) (书2,1) (书3,1)
         *
         *
         */
        //每隔5秒钟 计算过去1小时 的 Top 3 商品
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //以processtime作为时间语义


        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        FlinkKafkaConsumer<String> input = new FlinkKafkaConsumer<>("topn", new SimpleStringSchema(), properties);

        //从最早开始消费 位点
        input.setStartFromEarliest();


        DataStream<String> stream = env
                .addSource(input);

        DataStream<Tuple2<String, Integer>> ds = stream
                .flatMap(new LineSplitter()); //将输入语句split成一个一个单词并初始化count值为1的Tuple2<String, Integer>类型


        DataStream<Tuple2<String, Integer>> wcount = ds
                .keyBy(0)
                .window(SlidingProcessingTimeWindows.of(Time.seconds(600),Time.seconds(5)))
                //key之后的元素进入一个总时间长度为600s,每5s向后滑动一次的滑动窗口
                .sum(1);// 将相同的key的元素第二个count值相加

        wcount
                .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))//(shu1, xx) (shu2,xx)....
                //所有key元素进入一个5s长的窗口(选5秒是因为上游窗口每5s计算一轮数据,topN窗口一次计算只统计一个窗口时间内的变化)
                .process(new TopNAllFunction(3))
                .print();
//redis sink  redis -> 接口

        env.execute();
    }//





    private static final class LineSplitter implements
            FlatMapFunction<String, Tuple2<String, Integer>> {

        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            // normalize and split the line
            //String[] tokens = value.toLowerCase().split("\\W+");

            // emit the pairs
            /*for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<String, Integer>(token, 1));
                }
            }*/

            //(书1,1) (书2,1) (书3,1)
            out.collect(new Tuple2<String, Integer>(value, 1));
        }
    }

    private static class TopNAllFunction
            extends
            ProcessAllWindowFunction<Tuple2<String, Integer>, String, TimeWindow> {

        private int topSize = 3;

        public TopNAllFunction(int topSize) {

            this.topSize = topSize;
        }

        public void process(

                ProcessAllWindowFunction<Tuple2<String, Integer>, String, TimeWindow>.Context arg0,
                Iterable<Tuple2<String, Integer>> input,
                Collector<String> out) throws Exception {

            TreeMap<Integer, Tuple2<String, Integer>> treemap = new TreeMap<Integer, Tuple2<String, Integer>>(
                    new Comparator<Integer>() {

                        @Override
                        public int compare(Integer y, Integer x) {
                            return (x < y) ? -1 : 1;
                        }

                    }); //treemap按照key降序排列,相同count值不覆盖

            for (Tuple2<String, Integer> element : input) {
                treemap.put(element.f1, element);
                if (treemap.size() > topSize) { //只保留前面TopN个元素
                    treemap.pollLastEntry();
                }
            }


            for (Map.Entry<Integer, Tuple2<String, Integer>> entry : treemap
                    .entrySet()) {
                out.collect("=================\n热销图书列表:\n"+ new Timestamp(System.currentTimeMillis()) +  treemap.toString() + "\n===============\n");
            }

        }

    }


}//

查看输出:

=================
热销图书列表:
2019-03-05 22:32:40.004{8=(Java从入门到放弃,8), 7=(C++从入门到放弃,7), 5=(Php从入门到放弃,5)}
===============
=================
热销图书列表:
2019-03-05 22:32:45.004{8=(Java从入门到放弃,8), 7=(C++从入门到放弃,7), 5=(Php从入门到放弃,5)}
===============

所有代码,我放在了我的公众号,回复Flink可以下载

  • 海量【java和大数据的面试题+视频资料】整理在公众号,关注后可以下载~
  • 更多大数据技术欢迎和作者一起探讨~

关注我的公众号,后台回复【JAVAPDF】获取200页面试题!
5万人关注的大数据成神之路,不来了解一下吗?
5万人关注的大数据成神之路,真的不来了解一下吗?
5万人关注的大数据成神之路,确定真的不来了解一下吗?

欢迎您关注《大数据成神之路》
image.png
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,636评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,890评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,680评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,766评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,665评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,045评论 1 276
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,515评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,182评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,334评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,274评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,319评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,002评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,599评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,675评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,917评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,309评论 2 345
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,885评论 2 341

推荐阅读更多精彩内容

  • Hello,大家好!先自我介绍下,我叫....还是不说真名吧低调.....董小姐,传说中的......Mu...
    i董小姐阅读 3,136评论 3 3
  • 猪年大吉,上一个猪年好像才刚刚过去,又好像早已远去。 我写的东西好像都挺悲观消极的,可有什么办法呢,这是生活带给我...
    快乐的小星星阅读 164评论 2 6
  • 早晨,我迫不及待的问老爸:“什么时候钓鱼。”老爸说:作业完成,下午三点。听完这句话,我立刻行动起来了,我写啊写,写...
    帅云吞1阅读 357评论 0 1
  • 想通过一场会销现场收款600——1000万[红包][红包][红包]吗? 想通过卡模式一次活动现场储值80——100...
    富民之路阅读 961评论 0 1
  • mns区块链商城到底是一个什么样的商城平台?和其它商城有什么区别呢?MNS商城是建立在区块链的技术和算法的全球流通...
    波波谈古论今阅读 549评论 0 0