流处理实践

首先是画3d直方图,一开始我是打算使用ECharts来的,但后面发现python居然可以画。

1529819848329.png

数据是MySQL里面的,弄了一些假数据。使用了ORM

from sqlalchemy import *
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

engine = create_engine('mysql://root:123456@192.168.0.104/student?charset=utf8')
Session = sessionmaker(bind=engine)
sess = Session()
Base = declarative_base()

class UserAction(Base):
    __tablename__ = 'user_action'

    id = Column(Integer, primary_key=True)
    province = Column(String(255))
    month = Column(String(255))
    number = Column(Integer)
        
if __name__ == '__main__':
    Base.metadata.create_all(engine)

接下来就是画图了

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from plot_orm import UserAction
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D


engine = create_engine('mysql://root:123456@192.168.0.104/student?charset=utf8')
Session = sessionmaker(bind=engine)
session = Session()

def gather_provinces():
    users = session.query(UserAction)
    provinces = [user.province for user in users]
    provinces = list(set(provinces))
    return provinces


def gather_months():
    users = session.query(UserAction)
    months = [user.month for user in users]
    months = sorted(list(set(months)))
    return months


def gather_number(province, month):
    user = session.query(UserAction).filter_by(province=province, month=month)
    return user[0].number

def plot_3d_bars(x, y):
    # chinese config
    plt.rcParams['font.sans-serif'] = ['SimHei']

    # x--> months, y-->provinces
    fake_x = [i for i in range(len(x))]
    fake_y = [i for i in range(len(y))]

    _xx, _yy = np.meshgrid(x, y)
    fake_xx, fake_yy = np.meshgrid(fake_x, fake_y)

    # cal data
    xs, ys = _xx.ravel(), _yy.ravel()
    # print(xs, ys)
    fake_xs, fake_ys = fake_xx.ravel(), fake_yy.ravel()

    top = [gather_number(ys[i], xs[i]) for i in range(len(xs))]
    bottom = np.zeros_like(top)
    width = depth = 1

    # plot 3d bars
    fig = plt.figure()
    ax = fig.add_subplot(111, projection='3d')
    # print(fake_xs, fake_ys, bottom, width, depth, top)

    ax.bar3d(fake_xs, fake_ys, bottom, width, depth, top, shade=True)
    ax.set_title('V.Vader')

    ax.set_xlabel('month')
    ax.set_ylabel('province')
    ax.set_zlabel('number of people')
    # ax.set_xlim(x)
    ax.set_xticks(fake_x)
    ax.set_xticklabels(x)
    ax.set_yticks(fake_y)
    ax.set_yticklabels(y)
    plt.show()

def main():
    plot_3d_bars(['1', '2', '3', '4'], ['南昌', '北京', '上海', '杭州'])

if __name__ == '__main__':
    main()
Figure_1.png

启动zookeeper和Kafka

zkServer.sh start

kafka-server-start.sh  /home/hadoop/app/kafka_2.11-0.9.0.0/config/server.properties
1529808467435.png

先是使用Kafka发送数据给userActionLog

import time
import csv
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='192.168.0.106:9092')

csv_file = open('data/user_log.csv', 'r', encoding='utf-8')
reader = csv.reader(csv_file)
count = 0
for line in csv_file:
    if count > 0:
        info = line.split('\n')[0]
        producer.send('userActionLog', value=info.encode('utf-8'))
        print(info)
    count += 1
    time.sleep(1)
1529808722489.png

可以看到数据已经发好了(到Windows下开发的优点是IDE等比较happy,记得修改hosts文件不然会跑不起来)

接下来是写scale程序了,我之前是真的没有学过,所以花了2天学习了Scala,虽所Scala没有写过,上手还蛮简单的。

首先是Kafka的一些配置

package iceberg.kafka;

public class KafkaProperties {

    public static final String ZK = "192.168.0.106:2181";

    public static final String TOPIC = "userActionLog";

    public static final String BROKER_LIST = "192.168.0.106:9092";

    public static final String GROUP_ID = "V.Vader";
}

“18-30”岁、(2,3) “30-40”岁、(4, 5)“40-50”岁、6 “50-60”(7,8)岁男女购物人数

用户收藏数、购买数的top10商品,并把结果存储到MySQL数据库中

package iceberg

import java.sql.DriverManager
import java.util.HashMap

import iceberg.kafka.KafkaProperties
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}


object UserTop {
  def main(args: Array[String]): Unit = {

    val kafkaTopic: Map[String, Int] = Map[String, Int](KafkaProperties.TOPIC -> 1)

    val conf = new SparkConf().setAppName("KafkaSparkStream").setMaster("local[2]")

    val ssc = new StreamingContext(conf, Seconds(6))

    val stream = KafkaUtils.createStream(ssc, KafkaProperties.ZK, KafkaProperties.GROUP_ID, kafkaTopic, StorageLevel.MEMORY_ONLY)

    val logs = stream.map(_._2)

    val info = logs.map(line=>{(line.split(",")(8), line.split(",")(9))})
      .map(line=>{
        if (line._2.toInt == 0 ) {
          (line._1, "gender:female")
        }else{
          if (line._2.toInt == 1 ) {
            (line._1, "gender:male")
          }else{
            (line._1, "gender:others")
          }
        }
      })
    //“18-30”岁、(2,3) “30-40”岁、(4, 5)“40-50”岁、6 “50-60”(7,8)岁男女购物人数
    val info2 = info.map(line =>{
      if (line._1.toInt >= 2 ) {
        if (line._1.toInt <=3){
          (line._2, "age:18-30")
        }else{
          if (line._1.toInt <=5){
            (line._2, "age:30-40")
          }else{
            if (line._1.toInt <=6){
              (line._2, "age:40-50")
            }else{
              if (line._1.toInt <=8){
                (line._2, "age:50-60")
              }
            }
          }
        }
      }
      else {(line._2, "age:others")}
    })

    val result = info2.map((_, 1)).reduceByKey(_+_)
    //发送数据给kafka
    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.106:9092")
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    result.foreachRDD(rdd=>{
      rdd.foreachPartition(partitionOfRecoeds=>{
        val producer = new KafkaProducer[String, String](props)
        partitionOfRecoeds.foreach(pair=>{
          val str = pair._1.toString + pair._2.toString
          val message = new ProducerRecord[String, String]("result", null, str)
          producer.send(message)
        })
      })
    })

    // (2)用户收藏数、购买数的top10商品,并把结果存储到mysql数据库中
    val task2 = logs.map(line=>{(line.split(",")(2), line.split(",")(7))})
    val collection = task2.filter(_._2=="3")
    val buy = task2.filter(_._2=="2")

    buy.map(collect => (collect._1, 1)).reduceByKey(_ + _).foreachRDD(rdd =>{
      val connection = createConnection()
      rdd.sortBy(_._2, false).take(10).foreach(pair=>{
        println(pair)
        val sql = "insert into top(cat, num) values ('"+ pair._1.toString +"' , '"+pair._2.toString+ "')"
        connection.createStatement().execute(sql)
        println("execute sql")
      })
    }
    )
    collection.map(collect => (collect._1, 1)).reduceByKey(_ + _).foreachRDD(rdd =>{
      val connection = createConnection()
      rdd.sortBy(_._2, false).take(10).foreach(pair=>{
        println(pair)
        val sql = "insert into top(cat, num) values ('"+ pair._1.toString +"' , '"+pair._2.toString+ "')"
        connection.createStatement().execute(sql)
        println("execute sql")
      })
    }
    )

    def createConnection()={
      Class.forName("com.mysql.jdbc.Driver")
      DriverManager.getConnection("jdbc:mysql://192.168.0.104:3306/student", "root","123456")
    }

    ssc.start()

    ssc.awaitTermination()
  }
}

接下来就是用flask显示数据了

之前一直用Django(其实我用的最多的web框架是spring boot)然后现在用了flask感觉这个flask也是 超级爽的

from flask import Flask, render_template
from flask_socketio import SocketIO
from kafka import KafkaConsumer
import time


app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)


thread = None
consumer = KafkaConsumer('result', bootstrap_servers='192.168.0.106:9092')
result_list = [{"g0": "0"}, {"g1": "0"}, {"g2": "0"}, {"g3": "0"}, {"b0": "0"}, {"b1": "0"}, {"b2": "0"}, {"b3": "0"}]


def background_thread():
    for msg in consumer:
        str_msg = msg.value.decode('utf-8')
        info = str_msg.split(')')
        condition = info[0].split('(')[1]
        value = info[1]
        if condition == 'gender:male,age:18-30':
            result_list[0]['g0'] = value
        if condition == 'gender:male,age:30-40':
            result_list[1]['g1'] = value
        if condition == 'gender:male,age:40-50':
            result_list[2]['g2'] = value
        if condition == 'gender:male,age:50-60':
            result_list[3]['g3'] = value
        if condition == 'gender:female,age:18-30':
            result_list[4]['b0'] = value
        if condition == 'gender:female,age:30-40':
            result_list[5]['b1'] = value
        if condition == 'gender:female,age:40-50':
            result_list[6]['b2'] = value
        if condition == 'gender:female,age:50-60':
            result_list[7]['b3'] = value
        print('test_message', result_list)
        socketio.emit('test_message', {'data': result_list})
        time.sleep(1)


@socketio.on('test_connect')
def connect(message):
    print('message', message)
    global thread
    if thread is None:
        print('thread is None starting socket_io')
        thread = socketio.start_background_task(target=background_thread)
    socketio.emit('connected', {'data': 'server connected'})


@app.route('/')
def hello_world():
    return render_template("index.html")


if __name__ == '__main__':
    socketio.run(app, debug=True)

剩下的就是页面显示了。

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>DashBoard</title>
    <script src="static/js/socket.io.js"></script>
    <script src="static/js/jquery-3.1.1.min.js"></script>
    <script src="static/js/highcharts.js"></script>
    <script src="static/js/exporting.js"></script>

    <script type="text/javascript" charset="utf-8">
        var socket = io.connect('http://' + document.domain + ':' + location.port);
        socket.on('connect', function() {
            socket.send('user haas connected!')
            socket.emit('test_connect', {data: 'I\'m connected!'});
        });
        socket.on('connected',function(data){
            console.log(data)
        });

        socket.on('test_message',function(message){
            var obj = eval(message);
            var result = obj['data']

            $('#g1').html(result[0]['g0'])
            $('#g2').html(result[1]['g1'])

            $('#g3').html(result[2]['g2'])
            $('#g4').html(result[3]['g3'])

            $('#b1').html(result[4]['b0'])
            $('#b2').html(result[5]['b1'])

            $('#b3').html(result[6]['b2'])
            $('#b4').html(result[7]['b3'])


        });

    </script>
</head>
<body>
<div>
    <b>Girl:18-30: </b><b id="g1"></b>
    <b>Girl:30-40: </b><b id="g2"></b>
    <b>Girl:40-50: </b><b id="g3"></b>
    <b>Girl:50-60: </b><b id="g4"></b>
    <br>
    <b>Boy:18-30: </b><b id="b1"></b>
    <b>Boy:30-40: </b><b id="b2"></b>
    <b>Boy:40-50: </b><b id="b3"></b>
    <b>Boy:50-60: </b><b id="b4"></b>
</div>
<div id="container" style="width: 600px;height:400px;"></div>

<script type="text/javascript">
    $(document).ready(function () {
    Highcharts.setOptions({
        global: {
            useUTC: false
        }
    });

    Highcharts.chart('container', {
        chart: {
            type: 'spline',
            animation: Highcharts.svg, // don't animate in old IE
            marginRight: 10,
            events: {
                load: function () {

                    // set up the updating of the chart each second
                    var series1 = this.series[0];
                    var series2 = this.series[1];
                    var series3 = this.series[2];
                    var series4 = this.series[3];
                    var series5 = this.series[4];
                    var series6 = this.series[5];
                    var series7 = this.series[6];
                    var series8 = this.series[7];

                    setInterval(function () {

                        var x = (new Date()).getTime(), // current time

                        count1 = $('#g1').text();
                        y = parseInt(count1);
                        series1.addPoint([x, y], true, true);

                        count2 = $('#g2').text();
                        z = parseInt(count2);
                        series2.addPoint([x, z], true, true);

                        count3 = $('#g3').text();
                        z1 = parseInt(count3);
                        series3.addPoint([x, z1], true, true);

                        count4 = $('#g4').text();
                        z2 = parseInt(count4);
                        series4.addPoint([x, z2], true, true);

                        count5 = $('#b1').text();
                        z5 = parseInt(count5);
                        series5.addPoint([x, z5], true, true);

                         count6 = $('#b2').text();
                        z6 = parseInt(count6);
                        series6.addPoint([x, z6], true, true);

                         count7 = $('#b3').text();
                        z7 = parseInt(count7);
                        series7.addPoint([x, z7], true, true);

                         count8 = $('#b4').text();
                        z8 = parseInt(count8);
                        series8.addPoint([x, z8], true, true);

                    }, 1000);
                }
            }
        },
        title: {
            text: '男女生购物人数实时分析'
        },
        xAxis: {
            type: 'datetime',
            tickPixelInterval: 50
        },
        yAxis: {
            title: {
                text: '数量'
            },
            plotLines: [{
                value: 0,
                width: 1,
                color: '#808080'
            }]
        },
        tooltip: {
            formatter: function () {
                return '<b>' + this.series.name + '</b><br/>' +
                    Highcharts.dateFormat('%Y-%m-%d %H:%M:%S', this.x) + '<br/>' +
                    Highcharts.numberFormat(this.y, 2);
            }
        },
        legend: {
            enabled: true
        },
        exporting: {
            enabled: true
        },
        series: [
            {
            name: '女生18-30购物人数',
            data: (function () {
                // generate an array of random data
                var data = [],
                    time = (new Date()).getTime(),
                    i;

                for (i = -19; i <= 0; i += 1) {
                    data.push({
                        x: time + i * 1000,
                        y: Math.random()
                    });
                }
                return data;
            }())
        }
        ,
        {
            name: '女生30-40购物人数',
            data: (function () {
                // generate an array of random data
                var data = [],
                    time = (new Date()).getTime(),
                    i;

                for (i = -19; i <= 0; i += 1) {
                    data.push({
                        x: time + i * 1000,
                        y: Math.random()
                    });
                }
                return data;
            }())
        }
        ,
        {
            name: '女生40-50购物人数',
            data: (function () {
                // generate an array of random data
                var data = [],
                    time = (new Date()).getTime(),
                    i;

                for (i = -19; i <= 0; i += 1) {
                    data.push({
                        x: time + i * 1000,
                        y: Math.random()
                    });
                }
                return data;
            }())
        }
        ,
        {
            name: '女生50-60购物人数',
            data: (function () {
                // generate an array of random data
                var data = [],
                    time = (new Date()).getTime(),
                    i;

                for (i = -19; i <= 0; i += 1) {
                    data.push({
                        x: time + i * 1000,
                        y: Math.random()
                    });
                }
                return data;
            }())
        },
        {
            name: '男生18-30购物人数',
            data: (function () {
                // generate an array of random data
                var data = [],
                    time = (new Date()).getTime(),
                    i;

                for (i = -19; i <= 0; i += 1) {
                    data.push({
                        x: time + i * 1000,
                        y: Math.random()
                    });
                }
                return data;
            }())
        }
        ,
            {
            name: '男生30-40购物人数',
            data: (function () {
                // generate an array of random data
                var data = [],
                    time = (new Date()).getTime(),
                    i;

                for (i = -19; i <= 0; i += 1) {
                    data.push({
                        x: time + i * 1000,
                        y: Math.random()
                    });
                }
                return data;
            }())
        }
        ,
            {
            name: '男生40-50购物人数',
            data: (function () {
                // generate an array of random data
                var data = [],
                    time = (new Date()).getTime(),
                    i;

                for (i = -19; i <= 0; i += 1) {
                    data.push({
                        x: time + i * 1000,
                        y: Math.random()
                    });
                }
                return data;
            }())
        }
        ,
            {
            name: '男生50-60购物人数',
            data: (function () {
                // generate an array of random data
                var data = [],
                    time = (new Date()).getTime(),
                    i;

                for (i = -19; i <= 0; i += 1) {
                    data.push({
                        x: time + i * 1000,
                        y: Math.random()
                    });
                }
                return data;
            }())
        }
        ,
        ]
    });
});
</script>
</body>
</html>
1529809637345.png

这样socket就可以把数据传出去了,看起来还是蛮有意思的,所以接下来打算使用Hbase,redis,python,tensorflow来写一个实时处理股票并显示的系统前台页面使用ECharts,期待自己的表现。(超级期待)

1529809692848.png
1529809823927.png

对啦下面是可以选择的,这样就可以筛选多个信息了

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,530评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,403评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,120评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,770评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,758评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,649评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,021评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,675评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,931评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,751评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,410评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,004评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,969评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,042评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,493评论 2 343

推荐阅读更多精彩内容

  • Python资源大全中文版,包括:Web框架、网络爬虫、模板引擎、数据库、数据可视化、图片处理等,由伯乐在线持续更...
    dxl1236阅读 4,615评论 2 33
  • 英文原版:https://github.com/vinta/awesome-python中文版:https://g...
    会灰的大飞狼阅读 3,597评论 1 56
  • (转载) GitHub 上有一个 Awesome ­ XXX 系列的资源整理,资源非常丰富,涉及面非常广。awes...
    真是明明阅读 3,107评论 2 80
  • 正月末,2月底,乍暖还寒时候,最难将息。无论怎样,谁也阻挡不了时光的脚步啊,看那些美丽的生命,已然探出头,送来春的...
    简爱的简阅读 250评论 1 4
  • 今夜 皓月清冷 空旷静谧 那对与圆月相伴的星 还在去年的位置 凝眸我绵长的思念吗 还记得吗? 那年那月那日 我牵手...
    甲波布初阅读 193评论 0 0