kafka简单入门及与spring boot整合

这周来学习一点新的技术吧,感觉自己消息队列方面用的挺少的,所以这周学习下这方面的知识,我选择的是很流行的Kafka,今天抽时间就简单的了解一下。任何一门技术的学习都不是简简单单学习quick start就完事的,以前自己就容易犯这样的错误,感觉入个门就行,其实这样是错误的,入门只是开始。不过今天还是要先从quick start开始,先了解一些基本的概念,然后和spring boot整合一下,看看在项目中如何使用Kafka。

1、基本介绍

Kafka是一个分布式的流平台,主要有三个重要功能:

1、发布和订阅记录流,类似于消息队列或企业消息传递系统。
2、以容错的持久方式存储记录流
3、当记录流发生时处理它们。

Kafka通常用于两大类应用:
1、构建可在系统或应用程序之间可靠获取数据的实时流数据管道
2、构建转换或响应数据流的实时流应用程序
Kafka的核心API主要有四个:Producer API、Consumer API、Streams API、Connector API。根据字面意思也很好理解,就是生产者、消费者、流、连接器。
Producer:允许应用程序将记录流发布到一个或多个topics,关于什么是topics后面会讲到。
Consumer:则允许应用程序订阅一个或多个topics并处理生成的记录流。
Streams:允许应用程序充当流处理器,消费来自一个或多个topics的输入流,并生产出一个或多个输出topics的输出流,从而有效地将输入流转换为输出流。简单点理解可以认为它是一个转换器或者加工厂,将接收到输入流进行一定处理,然后再输出出去。
Connector:允许构建和运行可重用生产者或消费者,这些生产者和消费者将topics连接到现有应用程序或数据系统。


topics:topics是Kafka中很重要的一个概念,不管生产者生产数据还是消费者消费数据都离不开topcis。topics或者说主题是发布记录的类别或者别名,topics是一个多订阅者,也就是说一个topics可以有任意多个消费者订阅写入它的数据。
对于每一个topic,Kafka的集群都维护着一个分区日志文件,下面是官网的一张图片:


图-1.png

上面这个图片可以看出一个topic有3个分区,分别是partition0、partition1、partition2。每个分区都是一个有序的,不可变的记录序列,不断附加到结构化的提交日志中。并且分区中的每条记录都被分配一个称为offset的有序ID号,这个ID号唯一地标识分区中的每条记录。上图中显示的0、1、2....这些都是ID号,一个ID对应着一条数据,类似于数据库中的数据行。
Kafka集群通过使用一个可配置过期时间来持久化保留已发布的消息记录,无论这些记录是否被消费。比如,我设置的过期时间时两天,那么在记录发布之后的两天内我都是可以使用的,超过两天的话这些记录将会被清除掉。Kafka的性能在数据大小方面实际上是恒定的,也就是说性能和数据大小无关,因此长时间保存数据并不是一个问题。
实际上,基于每个消费者保留的唯一元数据就是该消费者在日志中的偏移量或位置。偏移量是由消费者控制的,通常情况下,消费者读取数据的时候会线性提高其偏移量,这个很好理解,因为偏移量本身就是线性增加的。但是,因为偏移量实际由消费者控制,所以其实消费者可以读取任意偏移量上的数据记录。
日志中的分区有几个作用:1、它们允许日志扩展到超出适合单个服务器的大小。每个单独的分区必须适合托管它的服务器,但主题可能有许多分区,因此它可以处理任意数量的数据。2、它们作为并行性的一个单元,更多的是在一点上。
关于理论的知识先了解到这里,主要了解它们的作用就行了,另外就是要多看官方的文档。下面试着启动Kafka并作一点简单操作。

2、安装和启动

首先到官网下载Kafka的压缩包,然后解压。因为Kafka的启动依赖于zookeeper的,当然这个zookeeper可以说Kafka自带的,也可以是你自己配置,有兴趣可以使用自己安装的zookeeper来启动,我就偷懒了。首先切换到Kafka的bin目录下,在这个目录下我们看到有很多的脚本文件,我们先启动zookeeper。

图-2.png

ypcfly@linux:~/ypcfly/kafka/kafka_2.11-2.1.0/bin$ ./zookeeper-server-start.sh 
USAGE: ./zookeeper-server-start.sh [-daemon] zookeeper.properties
ypcfly@linux:~/ypcfly/kafka/kafka_2.11-2.1.0/bin$ ./zookeeper-server-start.sh ../config/zookeeper.properties 

这里提示需要指定zookeeper的配置文件,Kafka的配置文件都是在config目录下,所以指定好配置文件再重启zookeeper。


图-3.png

接下来是启动Kafka的服务器,和zookeeper一样,也要指定Kafka server的配置文件。重新开一个窗口,执行下面的命令

./kafka-server-start.sh ../config/server.properties
图-4.png

这时候整个Kafka就启动成功了,我们先不着急进行操作,简单看下Kafka的配置文件,都有哪些比较重要的内容。

broker.id=0 # broker 可以近似的理解为Kafka服务器,broker中文为经纪人、代理人的意思
listeners=PLAINTEXT://:9092 #监听的端口号,默认9092
log.dirs=/tmp/kafka-logs #日志文件存储位置,可以指定多个,有逗号分隔
num.partitions=1 #每个topic分区数量,默认1
log.retention.hours=168 # 日志保留时间,也就是过期策略,默认168小时

除了这些以外还有一些配置,就不再详细说明了。下面我们创建一个topic,并使用生产者和消费者来对记录进行操作。先创建一个topic,并指定相关参数

./kafka-topics.sh --create --zookeeper localhost:2181 --topic my_topic --replication-factor 1 --partitions 1

下面解释下命令中各参数的意思:--zookeeper,应该不需要解释了吧,就是zookeeper的服务地址和端口号。--create表示创建topic(还有delete),--topic后面是创建的topic的名称,这里我创建的时候有个警告,说不能使用"."和"_",不过只是警告,但是最好还是不要用这两个符号。--partitions后跟着这个topic创建的分区的数量。--replication-factor后面跟着是每一个分区副本的数量。通过list命令可以查看topic的数量

./kafka-topics.sh --list --zookeeper localhost:2181 

接下来使用一个生产者向上面创建的topic发送数据,然后使用消费者接受topic的数据。先创建生产者,再创建消费者

./kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic #生产者
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic my_topic --from-beginning #消费者

因为我本地命令模式运行消费者有点问题,不知道为什么我生产者已经发出消息了,但是执行消费的命令控制台什么不显示,不知道是不是Kafka出错了,昨天刚安装好的时候还是可以的,清理了日志文件重启依然不行。

3、与spring boot整合

关于spring与Kafka整合,spring官方有专门的文档,有兴趣可以参考spring for apache kafka,而且也有例子,当然也可以自己创建。用idea创建一个spring boot项目,选择相关模块,并添加相关依赖。项目配置文件如下:

server.port=8090
# 数据库配置
spring.datasource.url=jdbc:postgresql://localhost:5432/test_db?useSSL=false&characterEncoding=utf8
spring.datasource.driver-class-name=org.postgresql.Driver
spring.datasource.username=postgres
spring.datasource.password=123456

# mybatis 配置
mybatis.type-aliases-package=com.ypc.kafka.entity
mybatis.mapper-locations=classpath:mapper/*.xml

# kafka配置
spring.kafka.consumer.group-id=custom_consumer
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-offset-reset=earliest

主要是指定Kafka消费者组的ID,已经服务器的地址和端口号,关于consumer、producer的那几个配置我也不是特别清楚,这个有时间需要看看文档。
这个项目主要是把Kafka作为消息队列使用的,所以我就想使用生产者发送消息,然后由监听的消费者处理消息,太复杂我这里也不会。
controller接受前台传过得参数,然后保存到数据库,成功后则调用发送消息的service将这个数据传递给消费者,消费者进行其他处理,自己都感觉会不会太简单了点......,但是复杂的操作自己也还没学。
自己就把消息发送者和消费者的代码贴一下吧

@Service
public class SenderService {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * 发送消息
     * @param topic
     * @param message
     */
    public void sendMessage(String topic, String message){
        kafkaTemplate.send(topic,message);
    }

}

这里发送消息使用的是kafkaTemplate,spring对第三方的支持好像都喜欢使用***Template,比如RedisTemplate、RestTemplate等等。kafkaTemplate重写了多个send方法,自己根据情况选择就好了。我还是建议多看看spring的文档。
消费者,消费者的话主要是使用一个注解,即@KafkaListener,这个注解参数很多,这里就不再详细说明了,我是监听一个topic的消息,当然也可以监听多个topic。

@Service
public class ConsumerService {

    /**
     * 监听消息
     * @param record
     */
    @KafkaListener(topics = "custom_kafka")
    public void listen(ConsumerRecord<?, ?> record) {

        StringBuilder stringBuilder = new StringBuilder();
        stringBuilder.append("topics=").append(record.topic()).append(", offset=").append(record.offset())
                .append(", value=").append(record.value());

        String value = (String) record.value()
                // 
        System.out.println(stringBuilder.toString());
    }

}

listen方法里面的ConsumerRecord包装了消息记录的很多信息,比如偏移量,消息值,消息头,主题等等。我们可以从中取出我们需要的数据,可以看下具体的代码:

public class ConsumerRecord<K, V> {
    public static final long NO_TIMESTAMP = -1L;
    public static final int NULL_SIZE = -1;
    public static final int NULL_CHECKSUM = -1;
    private final String topic;
    private final int partition;
    private final long offset;
    private final long timestamp;
    private final TimestampType timestampType;
    private final int serializedKeySize;
    private final int serializedValueSize;
    private final Headers headers;
    private final K key;
    private final V value;
    private volatile Long checksum;

    .....省略代码
}

使用idea的自带的rest client调用我的接口,传递的json参数如下:

{   "age":17,   "mobile":"13885474512",   "username":"kafka" }

最后输出结果如下:

topics=custom_kafka, offset=0, value={"id":6,"username":"kafka","age":17,"mobile":"13885474512"}

也就是说消费者成功接收到了存入数据库成功的数据,接下怎么对这些数据,就看具体的业务需要了。当然我这里只有一条数据,所以offset为0。像电商经常涉及到的秒杀业务,也是将数据写入消息队列,然后再转给具体的服务进行处理。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342

推荐阅读更多精彩内容

  • kafka的定义:是一个分布式消息系统,由LinkedIn使用Scala编写,用作LinkedIn的活动流(Act...
    时待吾阅读 5,291评论 1 15
  • Kafka提供的主要功能 生产者 ——>消息队列 <——消费者 所谓消息对象,本质上就是由生产者向消息队列不断发送...
    leofight阅读 1,625评论 0 5
  • 一、基本概念 介绍 Kafka是一个分布式的、可分区的、可复制的消息系统。它提供了普通消息系统的功能,但具有自己独...
    ITsupuerlady阅读 1,610评论 0 9
  • 本文转载自http://dataunion.org/?p=9307 背景介绍Kafka简介Kafka是一种分布式的...
    Bottle丶Fish阅读 5,424评论 0 34
  • 不觉间,今年还剩下44天了。回望年初,你是否还记得自己在年初许下的愿望,看完X本书,去健身瘦到N斤,要去几十个地方...
    WiFi密码是多少阅读 285评论 2 4