读书笔记:Kafka简单入门

[TOC]

基础概念

主题(Topic)与分区(Partition)

Kafka中的消息以主题为单位进行归类,主题是一个逻辑上的概念,生产者负责将消息发送到特定的主题(发送到Kafka集群中的每一条消息都要指定一个主题),而消费者负责订阅主题并进行消费。

主题以细分为多个分区,一个分区只属于单个主题,很多时候也会把分区称为主题分区(Topic-Partition)。

偏移量(offset)是消息在分区中的唯一标识,Kafka通过它来保证消息在分区内的顺序性

Kafka保证的是分区有序而不是主题有序。

Kafka中的分区可以分布在不同的服务器(broker)上,也就是说,一个主题可以横跨多个broker

多副本(Replica)机制

增加副本数量可以提升容灾能力

同一分区的不同副本中保存的是相同的消息

副本之间是“一主多从”的关系,其中leader副本负责处理读写请求,follower副本只负责与leader副本的消息同步

leader副本负责处理读写请求,follower副本只负责与leader副本的消息同步。副本处于不同的broker中,当leader副本出现故障时,从follower副本中重新选举新的leader副本对外提供服务

分区中的所有副本统称为AR(Assigned Replicas)。

所有与leader副本保持一定程度同步的副本(包括leader副本在内)组成ISR(In-Sync Replicas)

与leader副本同步滞后过多的副本(不包括leader副本)组成OSR(Out-of-Sync Replicas),

由此可见,AR=ISR+OSR

在正常情况下, AR=ISR,OSR集合为空。

只有在ISR集合中的副本才有资格被选举为新的leader

HW是High Watermark的缩写,俗称高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个offset之前的消息。

LEO是Log End Offset的缩写,它标识当前日志文件中下一条待写入消息的offset

分区ISR集合中的每个副本都会维护自身的LEO,而ISR集合中最小的LEO即为分区的HW,对消费者而言只能消费HW之前的消息。

安装Kafka

一个典型的 Kafka 体系架构包括若干 Producer、若干Broker、若干 Consumer

ZooKeeper中共有3个角色:leader、follower和observer

安装 Zookeeper

  1. 下载Zookeepr

    注意不要下载源码版本

    https://zookeeper.apache.org/releases.html

    图片.png

  1. 配置环境变量

    export ZOOKEEPER_HOME=/Users/gy/MyMacDocuments/apache-zookeeper-3.6.2(zookeeper路径)
    export PATH="$PATH:$ZOOKEEPER_HOME/bin"
    
  1. 修改配置文件

    cd $ZOOKEEPER_HOME/conf
    cp zoo_sample.cfg zoo.cfg
    

    修改 zoo.cfg 如下:

    # The number of milliseconds of each tick
    # Zookeeper服务器心跳时间,单位为ms
    tickTime=2000
    # The number of ticks that the initial 
    # synchronization phase can take
    # 投票选举新leader的初始化时间
    initLimit=10
    # The number of ticks that can pass between 
    # sending a request and getting an acknowledgement
    # leader与follower心跳检测最大容忍时间,响应超过 syncLimit * tickTime,
    # leader认为follower死掉,从服务器列表中删除follower
    syncLimit=5
    # the directory where the snapshot is stored.
    # do not use /tmp for storage, /tmp here is just 
    # example sakes.
    # 数据目录
    dataDir=/tmp/zookeeper/data
    # 日志目录
    dataLogDir=/tmp/zookeeper/log
    # the port at which the clients will connect
    # Zookeeper对外服务端口
    clientPort=2181
    
  1. 创建数据和日志目录

    mkdir -p /tmp/zookeeper/data
    mkdir -p /tmp/zookeeper/log
    
  1. 在${dataDir}目录(也就是/tmp/zookeeper/data)下创建一个myid文件,并写入一个数值,比如0。myid文件里存放的是服务器的编号。

  2. 启动Zookeepr

    zkServer.sh start  // 启动
    zkServer.sh status // 状态
    zkServer.sh stop //停止
    
  1. 集群模式配置(由于我没有三台机器就算了)

    在这3台机器的/etc/hosts文件中添加3台集群的IP地址与机器域名的映射,如

    192.168.0.2  node1  
    192.168.0.3  node2  
    192.168.0.4  node3  
    

然后在这3台机器的zoo.cfg文件中添加以下配置:

```sh

server.0=192.168.0.2:2888:3888
server.1=192.168.0.3:2888:3888
server.2=192.168.0.4:2888:3888
```

server.A=B:C:D

A: 服务器的编号,myid里面的值

B: 服务器ip地址

C: 服务器与集群中的leader服务器交换信息的端口

D: 表示选举时服务器相互通信的端口

在这3台机器上各自执行zkServer.sh start命令来启动服务

安装Kafka

  1. 下载Kafka

    archive.apache.org/dist/kafka


    图片.png
  1. 解压并修改配置文件

    $KAFKA_HOME/conf/server.properties

    # The id of the broker. This must be set to a unique integer for each broker.
    # broker 编号,如果 集群 中有多个broker,则每个broker编号要设置不同
    broker.id=0
    
    # broker 对外提供的服务入口地址
    listeners=PLAINTEXT://:9092
    
    # A comma separated list of directories under which to store log files
    # 存放消息日志文件地址
    log.dirs=/tmp/kafka-logs
    
    # Kafka 所需的 Zookeeper 集群地址
    zookeeper.connect=localhost:2181/kafka
    
  1. 启动Kafka

    在$KAFKA_HOME目录下执行

    ./bin/kafka-server-start.sh ./config/server.properties //启动     
    ./bin/kafka-server-start.sh -daemon ./config/server.properties //后台启动
    /kafka-server-start.sh ./config/server.properties & //后台启动
    
  1. 查看Kafka服务进程是否已经启动

    jps -l
    
图片.png

Kafka中一些重要的服务端参数

参数 介绍
zookeeper.connect 该参数指明broker要连接的ZooKeeper集群的服务地址(包含端口号),必填,如果集群中有多个节点,则用逗号分开
多个节点:这种情况可以使用chroot 路径???(啥意思,之后再了解) 如果不指定chroot,那么默认使用ZooKeeper的根路径
listeners 该参数指明broker监听客户端连接的地址列表,逗号分隔,默认值为null
格式为 protocol://hostname:port ,Kafka当前支持的协议类型有PLAINTEXT、SSL、SASL_SSL等,如果未开启安全认证,则使用简单的PLAINTEXT即可
不指定主机名,则表示绑定默认网卡
如果主机名是0.0.0.0,则表示绑定所有的网卡
advertised.listeners 作用和listeners类似,默认值也为 null
主要用于IaaS(Infrastructure as a Service)环境
使用场景:多块网卡,有公网,有私网,可以设置advertised.listeners参数绑定公网IP供外部客户端使用,而配置listeners参数来绑定私网IP地址供broker间通信使用。
broker.id 用来指定Kafka集群中broker的唯一标识,默认值为-1。如果没有设置,那么Kafka会自动生成一个。
log.dir和log.dirs Kafka 日志文件存放的根目录
log.dirs用来配置多个根目录(以逗号分隔)
log.dirs 的优先级比 log.dir 高
message.max.bytes 指定broker所能接收消息的最大值,默认值为1000012(B)
Producer 发送的消息大于这个参数所设置的值,那么(Producer)就会报出RecordTooLargeException的异常

生产者与消费者

生产者:将消息发布到Kafka主题的分区中

消费者:订阅主题从而消费消息

脚本使用

主题相关脚本 : $KAFKA_HOME/bin/kafka-topics.sh

// 创建一个主题
// --zookeeper 指定了Kafka所连接的Zookeeper服务地址
// --create 创建主题的动作
// --topic 创建的主题的名称
// --replication-factor 副本因子 【用来设置主题的副本数。每个主题可以有多个副本,副本位于集群中不同的broker上,也就是说副本的数量不能超过broker的数量,否则创建主题时会失败。】
// --partitions 分区个数
./bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic topic-demo --replication-factor 3 --partitions 4

// --describe 展示更多主题的具体信息
./bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe  --topic topic-demo

生产者相关脚本: $KAFKA_HOME/bin/kafka-console-producer.sh

// 发布消息
// --broker-list 连接Kafka集群的地址
// --topic 主题
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-demo

消费者相关脚本: $KAFKA_HOME/bin/kafka-console-consumer.sh

// 订阅相关topic
// --bootstrap-server 连接的Kafka集群的地址
// --topic 指定主题
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-demo

Java中使用

  1. 依赖

    <dependency>
     <groupId>org.apache.kafka</groupId>
     <artifactId>kafka-clients</artifactId>
     <version>2.0.0</version>
    </dependency>
    
  1. 生产者

    /**
     * 生产者
     *
     * @author Jenson
     */
    public class ProducerFastStart {
    
     private static final String BROKER_LIST = "localhost:9092";
     private static final String TOPIC = "topic-demo";
    
     public static void main(String[] args) {
         Properties properties = new Properties();
         properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         properties.put("bootstrap.servers", BROKER_LIST);
    
         // 配置生产者客户端参数,并创建生产者实例
         KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
    
         // 构造所需要发送的消息
         ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC, "hello kafka!");
    
         // 发送消息
         producer.send(record);
    
         // 关闭生产者客户端
         producer.close();
     }
    }
    
  1. 消费者

    /**
     * 消费者
     *
     * @author Jenson
     */
    public class ConsumerFastStart {
    
     private static final String BROKER_LIST = "localhost:9092";
     private static final String TOPIC = "topic-demo";
     private static final String GROUP_ID = "group.demo";
    
     public static void main(String[] args) {
         Properties properties = new Properties();
         properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         properties.put("bootstrap.servers", BROKER_LIST);
    
         // 设置消费组名称
         properties.put("group.id", GROUP_ID);
    
         // 创建一个消费者实例
         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    
         // 订阅主题
         consumer.subscribe(Collections.singletonList(TOPIC));
    
         // 循环消费消息
         while (true) {
             ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
             for (ConsumerRecord<String, String> record : records) {
                 System.out.println(record.value());
             }
         }
    
    
     }
    }
    

遇到的报错

java 使用kafka-clients时报错

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

解决

<!--加入依赖-->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-nop</artifactId>
    <version>1.7.2</version>
</dependency>
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,088评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,715评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,361评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,099评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,987评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,063评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,486评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,175评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,440评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,518评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,305评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,190评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,550评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,152评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,451评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,637评论 2 335

推荐阅读更多精彩内容