Spark Streaming实时流处理项目

项目简介
  • 今天到现在为止实战课程的访问量
  • 今天到现在为止从搜索引擎引流过来的实战课程的访问量
项目流程

需求分析 ==> 数据产生 ==> 数据采集 ==> 数据清洗 ==> 数据统计分析 ==> 统计结果入库 ==> 数据可视化

分布式日志收集框架Flume(印象笔记)

Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible(灵活的) architecture(架构) based on streaming data flows. It is robust(健壮) and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.

分布式流处理平台kafka

一个配置文件一个broker

单节点单broker,不用修改配置文件
单节点多broker,复制多份配置文件,broker.id,listeners端口号,log.dirs路径唯一

IDEA+Maven编程开发

<dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
</dependency>
<dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>${kafka.version}</version>
</dependency>
package com.test.kafka;

public class KafkaProperties {

    public static final String ZK = "192.168.247.100:2181";

    public static final String TOPIC = "test_repliation_3";

    public static final String BROKER_LIST = "192.168.247.100:9092,192.168.247.100:9093,192.168.247.100:9094";

    public static final String GROUP_ID = "testGroup";
}
package com.test.kafka;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class KafkaConsumer extends Thread {

    private String topic;

    public KafkaConsumer(String topic){
        this.topic = topic;
    }

    private ConsumerConnector createConnctor(){

        Properties properties = new Properties();
        properties.put("zookeeper.connect",KafkaProperties.ZK);
        properties.put("group.id",KafkaProperties.GROUP_ID);
        return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
    }

    @Override
    public void run() {
        ConsumerConnector consumer = createConnctor();
        Map<String,Integer> topicCountMap = new HashMap<String,Integer>();
        topicCountMap.put(topic,1);

        //String :topic
        //List: 数据流
        Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);

        KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);

        ConsumerIterator<byte[], byte[]> iterator = stream.iterator();

        while(iterator.hasNext()){
            String message = new String(iterator.next().message());
            System.out.println("rec: "+ message);
        }

    }
}
package com.test.kafka;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Properties;

public class KafkaProducer extends Thread{

    private String topic;

    private Producer<Integer, String> producer;

    public KafkaProducer(String topic){
        this.topic = topic;

        Properties properties = new Properties();
        properties.put("metadata.broker.list",KafkaProperties.BROKER_LIST);
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
        properties.put("request.required.acks","1");

        producer = new Producer<Integer, String>(new ProducerConfig(properties));
    }

    @Override
    public void run() {
        int messageNo = 1;

        while(true){
            String message = "message "+messageNo;
            producer.send(new KeyedMessage<Integer, String>(topic, message));
            System.out.println("Sent: "+message);

            messageNo ++;

            try {
                Thread.sleep(2000);
            } catch (Exception e){
                e.printStackTrace();
            }
        }
    }
}
package com.test.kafka;

public class KafkaClientApp {

    public static void main(String[] args) {
        new KafkaProducer(KafkaProperties.TOPIC).start();

        new KafkaConsumer(KafkaProperties.TOPIC).start();
    }
}
Flume对接Kafka

kafka sink

type=org.apache.flume.sink.kafka.KafkaSink
brokerList
topic
batchSize
requiredAcks

实战环境搭建
  • JDK安装
  • Scala安装
  • Maven安装
  • Hadoop安装
  • zookeeper安装
  • Hbase安装
  • Spark安装
  • IDEA+Maven+Spark Streaming
    添加依赖
<repositories>
    <repository>
      <id>cloudera</id>
      <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
    </repository>
  </repositories>

  <dependencies>

    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
      <version>${kafka.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-client</artifactId>
      <version>${hbase.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-server</artifactId>
      <version>${hbase.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-flume-sink_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>
  </dependencies>
example
NetworkWordCount(基于网络实时的文字统计功能)

nc -lk 9999//在9999端口发消息

  • spark-submit提交作业(生产)

spark-submit \ --class org.apache.spark.examples.streaming \ --name NetworkWordCount \ --master local[2] \ /home/kang/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/jars/spark-examples_2.11-2.1.0.jar \ localhost 9999

  • spark-shell提交(测试)

spark-shell --master local[2] --jars ~/lib/mysql-connector-java-5.1.34.jar

import org.apache.spark.streaming.{Seconds, StreamingContext}
val ssc = new StreamingContext(sc, Seconds(1))
val lines = ssc.socketTextStream("192.168.247.100", 9999)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
工作原理
  • 粗粒度

Spark Streaming接收到实时数据流,把数据按照指定的时间段切成一片片小的数据块,然后把小的数据块传给Spark Engine处理

核心概念
StreamingContext

常用的两个构造方法
The batch interval must be set based on the latency requirements of your application and available cluster resources.

def this(sparkContext: SparkContext, batchDuration: Duration) = {
    this(sparkContext, null, batchDuration)
  }

def this(conf: SparkConf, batchDuration: Duration) = {
    this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
  }
After a context is defined, you have to do the following.
  • Define the input sources by creating input DStreams.
  • Define the streaming computations by applying transformation and output operations to DStreams.
  • Start receiving data and processing it using streamingContext.start().
  • Wait for the processing to be stopped (manually or due to any error) using streamingContext.awaitTermination().
  • The processing can be manually stopped using streamingContext.stop().
Points to remember:
  • Once a context has been started, no new streaming computations can be set up or added to it.
  • Once a context has been stopped, it cannot be restarted.
  • Only one StreamingContext can be active in a JVM at the same time.
  • stop() on StreamingContext also stops the SparkContext. To stop only the StreamingContext, set the optional parameter of stop() called stopSparkContext to false.
  • A SparkContext can be re-used to create multiple StreamingContexts, as long as the previous StreamingContext is stopped (without stopping the SparkContext) before the next StreamingContext is created.
DStreams

Discretized Stream or DStream is the basic abstraction provided by Spark Streaming. It represents a continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream. Internally, a DStream is represented by a continuous series of RDDs, which is Spark’s abstraction of an immutable, distributed dataset.

Any operation applied on a DStream translates to operations on the underlying RDDs

Input DStreams and Receivers

Input DStreams are DStreams representing the stream of input data received from streaming sources. In the quick example, lines was an input DStream as it represented the stream of data received from the netcat server. Every input DStream (except file stream, discussed later in this section) is associated with a Receiver (Scala doc, Java doc) object which receives the data from a source and stores it in Spark’s memory for processing.

Transformations on DStreams
Output Operations on DStreams
案例实战
socket数据

需添加依赖

    <dependency>
      <groupId>com.fasterxml.jackson.module</groupId>
      <artifactId>jackson-module-scala_2.11</artifactId>
      <version>2.6.5</version>
    </dependency>

    <dependency>
      <groupId>net.jpountz.lz4</groupId>
      <artifactId>lz4</artifactId>
      <version>1.3.0</version>
    </dependency>
package com.test

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Spark Streaming处理Socket数据
  */
object NetworkWordCount {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")

    val ssc = new StreamingContext(sparkConf,Seconds(5))

    val line = ssc.socketTextStream("192.168.247.100",6789)

    val result = line.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

    result.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

发送socket数据:nc -lk 6789

文件系统

File Streams: For reading data from files on any file system compatible with the HDFS API (that is, HDFS, S3, NFS, etc.)

package com.test

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 处理文件系统数据
  */
object FileWorkCount {

  def main(args: Array[String]): Unit = {
    //文件系统不需要receiver,可以只用一个线程
    val sparkConf = new SparkConf().setMaster("local").setAppName("NetworkWordCount")

    val ssc = new StreamingContext(sparkConf,Seconds(5))

    //文件整体moving过去
    val line = ssc.textFileStream("hdfs://192.168.247.100:9000/sparkStreaming/")
    val result = line.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

    result.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
updateStateByKey
package com.test

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object StatefulWordCount {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")

    val ssc = new StreamingContext(sparkConf,Seconds(5))

    //如果使用stateful的算子,必须要设置checkpoint目录
    ssc.checkpoint("hdfs://192.168.247.100:9000/sparkStreaming/")

    val line = ssc.socketTextStream("192.168.247.100",6789)

    val result = line.flatMap(_.split(" ")).map((_,1))

    val state = result.updateStateByKey[Int](updateFunction _)

    state.print()

    ssc.start()
    ssc.awaitTermination()
  }

  def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val newCount = newValues.sum + runningCount.getOrElse(0)
    Some(newCount)
  }
}
数据保存到数据库
dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}
transform函数的使用之黑名单过滤

zhangshan,123
lisi,123
wangwu,123

package com.test

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object TransformApp {

  def main(args: Array[String]): Unit = {
      val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")

      val ssc = new StreamingContext(sparkConf,Seconds(5))

      /**
        * 构建黑名单
        */
      val blacks = List("zhangshan","lisi")
      val balcksRDD = ssc.sparkContext.parallelize(blacks).map((_,true))

      val line = ssc.socketTextStream("192.168.247.100",6789)

      val clicklog = line.map(x => (x.split(",")(0),x)).transform(rdd => {
        rdd.leftOuterJoin(balcksRDD)
          .filter(x => x._2._2.getOrElse(false) != true)
          .map(_._2._1)
      })

      clicklog.print()

      ssc.start()
      ssc.awaitTermination()
  }
}
Spark Streaming整合SparkSQL
package com.test

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}

object sqlNetworkWordCount {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")

    val ssc = new StreamingContext(sparkConf,Seconds(5))

    val line = ssc.socketTextStream("192.168.247.100",6789)

    val words = line.flatMap(_.split(" "))

    words.foreachRDD { (rdd,time) =>
      // Get the singleton instance of SparkSession
      val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
      import spark.implicits._

      // Convert RDD[String] to RDD[case class] to DataFrame
      val wordsDataFrame = rdd.map(w => Record(w)).toDF()

      // Creates a temporary view using the DataFrame
      wordsDataFrame.createOrReplaceTempView("words")

      // Do word count on table using SQL and print it
      val wordCountsDataFrame =
        spark.sql("select word, count(*) as total from words group by word")
      println(s"========= $time =========")
      wordCountsDataFrame.show()
    }

    ssc.start()
    ssc.awaitTermination()
  }
}

case class Record(word: String)

object SparkSessionSingleton {

  @transient  private var instance: SparkSession = _

  def getInstance(sparkConf: SparkConf): SparkSession = {
    if (instance == null) {
      instance = SparkSession
        .builder
        .config(sparkConf)
        .getOrCreate()
    }
    instance
  }
}
Spark Streaming整合Flume
Push方式

添加相关的依赖

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-flume_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

Flume Agent配置


# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source 
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /home/kang/flume.log

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = 192.168.247.100:9092
a1.sinks.k1.topic = hello_topic
a1.sinks.k1.batchSize = 100
a1.sinks.k1.requiredAcks = 1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1

Idea 代码

package com.test

import org.apache.spark.streaming.flume._
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Spark Streaming整合Flume第一种方式
  */
object FlumePushWordCount {

  def main(args: Array[String]): Unit = {

    if(args.length != 2){
      System.err.println("Usage FlumePushWordCount <hostname> <port>")
      System.exit(1)
    }

    val Array(hostname, port) = args

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePushWordCount")
    val ssc = new StreamingContext(sparkConf,Seconds(5))

    val flumeStream = FlumeUtils.createStream(ssc,hostname,port.toInt)
    flumeStream.map(item => new String(item.event.getBody.array()).trim)
        .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

1)启动sparkstreaming 作业
2)启动flume agent
flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/avro.conf --name a1 -Dflume.root.logger=INFO,console

  1. 输入数据,观察IDEA控制台输出
  • 服务器上运行
    ./bin/spark-submit --packages org.apache.spark:spark-streaming-flume_2.11:2.1.0 ...
Pull方式
  • 使用可靠的Receiver

可靠的接收器在接收到数据并将数据存储在Spark中时正确地向可靠的源发送确认。

添加依赖

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-flume-sink_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-lang3</artifactId>
      <version>3.5</version>
    </dependency>

Flume Agent配置

# Name the components on this agent   //取别名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/kang/infos.txt

# Describe the sink
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname = 192.168.247.1
a1.sinks.k1.port = 4444

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel   
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

IDEA 代码

package com.test

import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume._
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Spark Streaming整合Flume第二种方式
  */
object FlumePullWordCount {

  def main(args: Array[String]): Unit = {

    if(args.length != 2){
      System.err.println("Usage FlumePullWordCount <hostname> <port>")
      System.exit(1)
    }

    val Array(hostname, port) = args

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePullWordCount")
    val ssc = new StreamingContext(sparkConf,Seconds(5))

    val flumeStream = FlumeUtils.createPollingStream(ssc,hostname,port.toInt)
    flumeStream.map(item => new String(item.event.getBody.array()).trim)
        .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

先启动Flume,再启动SparkStreaming

Spark Streaming整合Kafka
Receiver-based

启动kafka:kafka-server-start.sh $KAFKA_HOME/config/server-1.properties
创建topic:kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka_streaming_topic
启动生产者:kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka_streaming_topic

IDEA代码

package com.test

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils

object KafkaReceiverWordCount {

  def main(args: Array[String]): Unit = {

    if(args.length != 4){
      System.err.println("Usage KafkaReceiverWordCount <zkQuorum> <group> <topic> <numThread>")
      System.exit(1)
    }

    val Array(zkQuorum, group, topic, numThread) = args

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePushWordCount")
    val ssc = new StreamingContext(sparkConf,Seconds(5))

    val topicMap = topic.split(",").map((_, numThread.toInt)).toMap

    val message = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)

    //message第二个才是有效信息
    message.map(_._2).flatMap(_.split(" ")).map((_,1))
        .reduceByKey(_+_).print()


    ssc.start()
    ssc.awaitTermination()
  }
}
direct方式
package com.test

import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object KafkaDirectWordCount {

  def main(args: Array[String]): Unit = {

    if(args.length != 2){
      System.err.println("Usage KafkaReceiverWordCount <brokers> <topic>")
      System.exit(1)
    }

    val Array(brokers, topic) = args

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaDirectWordCount")
    val ssc = new StreamingContext(sparkConf,Seconds(5))


    val topicSet = topic.split(",").toSet
    val kafkaParams = Map[String,String]("metadata.broker.list"-> brokers)

    val message = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicSet)

    //message第二个才是有效信息
    message.map(_._2).flatMap(_.split(" ")).map((_,1))
        .reduceByKey(_+_).print()


    ssc.start()
    ssc.awaitTermination()
  }
} 
log4j+flume+kafka+sparkStreaming
  • 编写log4j.properties
log4j.rootLogger = INFO,stdout,flume

log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = %d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%C] [%p] - %m%n

log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = 192.168.247.100
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true
  • 添加依赖包
    <dependency>
      <groupId>org.apache.flume.flume-ng-clients</groupId>
      <artifactId>flume-ng-log4jappender</artifactId>
      <version>1.6.0</version>
    </dependency>
  • logger 产生器
import org.apache.log4j.Logger;

public class LoggerGenerator {

    private static Logger logger = Logger.getLogger(LoggerGenerator.class.getName());

    public static void main(String[] args) throws Exception {

        int index = 0;
        while(true){
            Thread.sleep(2000);
            logger.info("value : " + index++ );
        }
    }
}
  • flume 配置文件
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source 
a1.sources.r1.type = avro
a1.sources.r1.bind = 192.168.247.100
a1.sources.r1.port = 41414

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = 192.168.247.100:9092
a1.sinks.k1.topic = hello_topic
a1.sinks.k1.batchSize = 10
a1.sinks.k1.requiredAcks = 1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
项目实战
  • 为什么要记录用户访问行为日志
    网站页面的访问量
    网站的粘性
    推荐
python日志产生器
  • python代码
#coding=utf-8
import random
import time
url_path = [
    "class/112.html",
    "class/128.html",
    "class/145.html",
    "class/146.html",
    "class/131.html",
    "class/130.html",
    "learn/821",
    "course/list",
]

ip_slices = [123,132,10,98,43,55,72,89,31,192,168,247,99]

http_reference = [
    "http://www.baidu.com/s?wd={query}",
    "http://www.sogou.com/web?query={query}",
    "http://cn.bing.com/search?q={query}",
    "http://search.yahoo.com/search?p={query}"
]

search_keyword = [
    "Spark SQL实战"
    "Hadoop基础",
    "Storm实战",
    "Spark Streaming实战",
    "大数据",
    "java"
]

status_codes = ["200", "404", "500"]

def sample_ip():
    slice = random.sample(ip_slices, 4)
    return ".".join([str(item) for item in slice])

def sample_url():
    return random.sample(url_path, 1)[0]

def sample_referer():
    if random.uniform(0, 1) > 0.2:
        return "-"
    
    refer_str = random.sample(http_reference,1)
    query_str = random.sample(search_keyword, 1)
    return refer_str[0].format(query=query_str[0])
    
def sample_status_code():
    return random.sample(status_codes, 1)[0]
    
def generate_log(count = 10):
    time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    f = open("/home/kang/project/SparkStreaming/logs/access.log","w+")
    while count >= 1:
        query_log = "{ip}\t{localtime}\t\"GET /{url} HTTP/1.1\"\t{status}\t{referer}".format(url=sample_url(), ip=sample_ip(), referer=sample_referer(), status=sample_status_code(), localtime=time_str)
        print query_log
        f.write(query_log + "\n")
        count = count - 1

if __name__ == '__main__':
    generate_log(100)
  • 定时调度器工具crontab

一分钟执行一次
crontab -e
*/1 * * * * /home/kang/project/SparkStreaming/shell/generator_log.sh
:x 执行

flume+kafka+sparkstreaming连通,清洗
  • flume
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source 
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/kang/project/SparkStreaming/logs/access.log

# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.brokerList = 192.168.247.100:9092
a1.sinks.k1.topic = hello_topic
a1.sinks.k1.batchSize = 10
a1.sinks.k1.requiredAcks = 1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • Spark
package com.test.project

import com.test.project.domain.ClickLog
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object ImoocStatStreamingApp {

  def main(args: Array[String]): Unit = {

    if(args.length != 4){
      System.err.println("Usage KafkaReceiverWordCount <zkQuorum> <group> <topic> <numThread>")
      System.exit(1)
    }

    val Array(zkQuorum, group, topic, numThread) = args

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePushWordCount")
    val ssc = new StreamingContext(sparkConf,Seconds(60))

    val topicMap = topic.split(",").map((_, numThread.toInt)).toMap

    val message = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)

    //测试数据接收
//    message.map(_._2).print()

    val logs = message.map(_._2)
    val cleanData = logs.map(line => {
      val infos = line.split("\t")
      val url = infos(2).split(" ")(1)

      var courseId = 0

      if(url.startsWith("/class")){
        val courseIdHTML = url.split("/")(2)
        courseId = courseIdHTML.substring(0,courseIdHTML.lastIndexOf(".")).toInt
      }
      ClickLog(infos(0), infos(1), courseId, infos(3).toInt, infos(4))
    }).filter(clicklog => clicklog.courseId != 0)

    cleanData.print()

    ssc.start()
    ssc.awaitTermination()
  }
}
今天到现在为止实战课程的访问量
  • HBase表设计

创建表:create 'imooc_course_clickcount', 'info'
RowKey设计:day_courseid

  • HBase DAO
package com.test.project.dao

import com.test.project.domain.CourseClickCount
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.util.Bytes
import utils.HbaseUtil

import scala.collection.mutable.ListBuffer

/**
  * 访问层
  */
object CourseClickCountDAO {

  val tableName = "imooc_course_clickcount"
  val cf = "info"
  val qualifer = "click_count"

  def save(list: ListBuffer[CourseClickCount]) = {

    val table = HbaseUtil.getInstance().getTable(tableName)
    for(ele <- list) {
      table.incrementColumnValue(Bytes.toBytes(ele.day_course),
        Bytes.toBytes(cf),
        Bytes.toBytes(qualifer),
        ele.click_count)
    }
  }

  def count(day_course:String):Long = {

    val table = HbaseUtil.getInstance().getTable(tableName)
    val get = new Get(Bytes.toBytes(day_course))
    val value = table.get(get).getValue(cf.getBytes, qualifer.getBytes)

    if(value == null){
      0l
    } else{
      Bytes.toLong(value)
    }
  }

  def main(args: Array[String]): Unit = {
    val list = new ListBuffer[CourseClickCount]

    list+=(CourseClickCount("20190314_7",18),CourseClickCount("20190313_6",66))

    save(list)

    println(count("20190314_7") + count("20190313_6"))
  }
}
今天到现在为止搜索引擎引流过来的实战课程访问量
  • HBase表设计

创建表:create 'imooc_search_course_clickcount', 'info'
RowKey设计:day_referer_courseid

  • HBase DAO
package com.test.project.dao

import com.test.project.domain.{CourseClickCount, CourseSearchClickCount}
import org.apache.hadoop.hbase.client.Get
import org.apache.hadoop.hbase.util.Bytes
import utils.HbaseUtil

import scala.collection.mutable.ListBuffer

/**
  * 访问层
  */
object CourseSearchClickCountDAO {

  val tableName = "imooc_course_search_clickcount"
  val cf = "info"
  val qualifer = "click_count"

  def save(list: ListBuffer[CourseSearchClickCount]) = {

    val table = HbaseUtil.getInstance().getTable(tableName)
    for(ele <- list) {
      table.incrementColumnValue(Bytes.toBytes(ele.day_search_course),
        Bytes.toBytes(cf),
        Bytes.toBytes(qualifer),
        ele.click_count)
    }
  }

  def count(day_course:String):Long = {

    val table = HbaseUtil.getInstance().getTable(tableName)
    val get = new Get(Bytes.toBytes(day_course))
    val value = table.get(get).getValue(cf.getBytes, qualifer.getBytes)

    if(value == null){
      0l
    } else{
      Bytes.toLong(value)
    }
  }

  def main(args: Array[String]): Unit = {

    println(count("20190315_www.baidu.com_112"))
  }
}
package com.test.project

import com.test.project.dao.{CourseClickCountDAO, CourseSearchClickCountDAO}
import com.test.project.domain.{ClickLog, CourseClickCount, CourseSearchClickCount}
import com.test.project.util.DataUtils
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable.ListBuffer

object ImoocStatStreamingApp {

  def main(args: Array[String]): Unit = {

    if(args.length != 4){
      System.err.println("Usage KafkaReceiverWordCount <zkQuorum> <group> <topic> <numThread>")
      System.exit(1)
    }

    val Array(zkQuorum, group, topic, numThread) = args

    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePushWordCount")
    val ssc = new StreamingContext(sparkConf,Seconds(60))

    val topicMap = topic.split(",").map((_, numThread.toInt)).toMap

    val message = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)

    //测试数据接收
//    message.map(_._2).print()

    val logs = message.map(_._2)
    val cleanData = logs.map(line => {
      val infos = line.split("\t")
      val url = infos(2).split(" ")(1)

      var courseId = 0

      if(url.startsWith("/class")){
        val courseIdHTML = url.split("/")(2)
        courseId = courseIdHTML.substring(0,courseIdHTML.lastIndexOf(".")).toInt
      }
      ClickLog(infos(0), DataUtils.parse(infos(1)), courseId, infos(3).toInt, infos(4))
    }).filter(clicklog => clicklog.courseId != 0)

//    cleanData.print()

    /**
      * 统计今天到现在为止实战课程访问量
      */
    cleanData.map(x => {
      (x.time+"_"+x.courseId, 1)
    }).reduceByKey(_+_).foreachRDD(rdd => {
      rdd.foreachPartition(partitionRecodes => {
        val list = new ListBuffer[CourseClickCount]
        partitionRecodes.foreach(pair => {
          list += (CourseClickCount(pair._1, pair._2))
        })
        CourseClickCountDAO.save(list)
      })
    })

    /**
      * 统计从搜索引擎过来的课程访问量
      */

    cleanData.map(x => {
      val referer = x.referer.replaceAll("//","/")
      val splits = referer.split("/")
      var host = ""
      if(splits.length > 2){
        host = splits(1)
      }

      (host, x.courseId, x.time)
    }).filter(_._1!="").map(x => {
      (x._3 + "_" + x._1+ "_" + x._2, 1)
    }).reduceByKey(_+_).foreachRDD(rdd => {
      rdd.foreachPartition(partitionRecodes => {
        val list = new ListBuffer[CourseSearchClickCount]
        partitionRecodes.foreach(pair => {
          list += (CourseSearchClickCount(pair._1, pair._2))
        })
        CourseSearchClickCountDAO.save(list)
      })
    })

    ssc.start()
    ssc.awaitTermination()
  }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,937评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,503评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,712评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,668评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,677评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,601评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,975评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,637评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,881评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,621评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,710评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,387评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,971评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,947评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,189评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,805评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,449评论 2 342

推荐阅读更多精彩内容