Golang,kafka实现消息推拉

Kafka的安装与启动

kafka中涉及的名词
  1. 消息记录:由一个key,一个value和一个时间戳构成,消息最终存储在主题下的分区中,记录在生产中称为生产者记录,在消费者中称为消费记录。Kafka集群保持了所有发布的消息,直到它们过期,无论消息是否被消费了,在一个可配置的时间段内,Kafka集群保留了所有发布的消息。比如消息的保存策略被设置为2天,那么在一个消息被发布的两天时间内,它都是可以被消费的。Kafka的性能是和数据量无关的常量级的,所以保留太多数据并不是问题
  2. 生成者:生产者用于发布消息
  3. 消费者:消费者用于订阅消息
  4. 消费者组:相同的groupID的消费者将视为同一个消费者组,每个消费者都需要设置一个组id,每条消息只能被consumer group中的一个Consumer消费,但是可以被多个consumer group消费
  5. 主题(topic):消息的一种逻辑分组,用于对消息分门别类,每一类消息称之为一个主题,相同主题的消息放在一个队列中
  6. 分区(partition):消息的一种物理分组,一个主题被拆成多个分区,每一个分区就是一个顺序的,不可变的消息队列,并且可以持续添加,分区中的每个消息都被分配了一个唯一的id,称之为偏移量(offset),在每个分区中偏移量都是唯一的。每个分区对应一个逻辑log,有多个segment组成
  7. 偏移量:分区中每个消息都有一个唯一的Id,称之为偏移量,代表已经消费的位置
  8. 代理(broker):一台kafka服务器称之为一个broker
  9. 副本(replica):副本只是一个分区(partition)的备份。副本不读取或写入数据。它们用于防止数据丢失
  10. 领导者:leader是负责给定分区的所有读取和写入的节点
  11. 追随者:跟随领导者指令的节点被称为Follower。
  12. zookeeper:Kafka代理是无状态的,所以它们使用Zookeeper来维护它们的集群状态。Zookeeper用于管理和协调Kafka代理
kafka功能
  • 发布订阅:生产者生产消息(数据流),将消息发送给kafka指定的主题队列中,也可以发送到topic中的指定分区中,消费者从kafka的指定队列中获取消息,然后来处理消息
一. Mac版安装
brew install kafka

安装kafka需要依赖zookeeper的,所以安装kafka的时候也会包含zooker

  • kafka的安装目录:/usr/local/Cellar/kafka
  • kafka的配置文件目录:/usr/local/etc/kafka
  • kafka服务的配置文件:/usr/local/etc/kafka/server.properties
  • zookeeper配置文件:/usr/local/etc/kafka/zookeeper.properties

server.properties中重要配置

  1. broker.id=0
  2. listeners=PLAINTEXT://:9092
  3. advertised.listeners=PLAINTEXT://127.0.0.1:9092
  4. log.dirs=/usr/local/var/lib/kafka-logs

zookeeper.properties重要配置

  1. dataDir=/usr/local/var/lib/zookeeper
  2. clientPort=2181
  3. maxClientCnxns=0
二. 启动zookeeper

新创建终端启动zookeeper

  1. cd /usr/local/Cellar/kafka/2.1.0
  2. ./bin/zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties
  3. 打印台显示:INFO Reading configuration from: /usr/local/etc/kafka/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
  4. ...即是启动成功
三.启动kafka

新创建终端启动kafka(启动kafka之前必须先启动zookeeper)

  1. cd /usr/local/Cellar/kafka/2.1.0
  2. ./bin/kafka-server-start /usr/local/etc/kafka/server.properties
  3. 打印台显示:INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
  4. ...即启动成功
  5. 启动了kafka之后,zookeeper端会报一些Error:KeeperErrorCode = NoNode for /config/topics/test之类的错误,这个是没有问题的,这是因为kafka向zookeeper发送了关于该路径的一些请求信息,但是不存在,所以这是没有问题的
四.创建topic

新创建终端

  1. cd /usr/local/Cellar/kafka/2.1.0
  2. 创建一个名为“test”的主题:./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
  3. 查看所有的topic:./bin/kafka-topics --list --zookeeper localhost:2181
  4. 查看某个topic的信息,比如test:./bin/kafka-topics --describe --zookeeper localhost:2181 --topic test
五.发送消息

新创建一个终端,作为生产者,用于发送消息,每一行就是一条信息,将消息发送到kafka服务器

  1. cd /usr/local/Cellar/kafka/2.1.0
  2. ./bin/kafka-console-producer --broker-list localhost:9092 --topic test
  3. send one message
  4. send two message
六.消费消息(接受消息)

新创建一个终端作为消费者,接受消息

  1. cd /usr/local/Cellar/kafka/2.1.0
  2. ./bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning
  3. send one message
  4. send two message(这些便是从生产者获得的消息)
注意:发送消息与接受消息必须启动kafka与zookeeper

GoLang实现kafka的信息发布与订阅

生产者

import (
    "fmt"
    "github.com/Shopify/sarama"
)


func main() {
    config := sarama.NewConfig()
    // 等待服务器所有副本都保存成功后的响应
    config.Producer.RequiredAcks = sarama.WaitForAll
    // 随机的分区类型:返回一个分区器,该分区器每次选择一个随机分区
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    // 是否等待成功和失败后的响应
    config.Producer.Return.Successes = true

    // 使用给定代理地址和配置创建一个同步生产者
    producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, config)
    if err != nil {
        panic(err)
    }

    defer producer.Close()

    //构建发送的消息,
    msg := &sarama.ProducerMessage {
        //Topic: "test",//包含了消息的主题
        Partition: int32(10),//
        Key:        sarama.StringEncoder("key"),//
    }

    var value string
    var msgType string
    for {
        _, err := fmt.Scanf("%s", &value)
        if err != nil {
            break
        }
        fmt.Scanf("%s",&msgType)
        fmt.Println("msgType = ",msgType,",value = ",value)
        msg.Topic = msgType
        //将字符串转换为字节数组
        msg.Value = sarama.ByteEncoder(value)
        //fmt.Println(value)
        //SendMessage:该方法是生产者生产给定的消息
        //生产成功的时候返回该消息的分区和所在的偏移量
        //生产失败的时候返回error
        partition, offset, err := producer.SendMessage(msg)

        if err != nil {
            fmt.Println("Send message Fail")
        }
        fmt.Printf("Partition = %d, offset=%d\n", partition, offset)
    }
}

消费者

import (
    "fmt"
    "github.com/Shopify/sarama"
    "sync"
    )
var (
    wg  sync.WaitGroup
)
func main() {
    // 根据给定的代理地址和配置创建一个消费者
    consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
    if err != nil {
        panic(err)
    }
    //Partitions(topic):该方法返回了该topic的所有分区id
    partitionList, err := consumer.Partitions("test")
    if err != nil {
        panic(err)
    }

    for partition := range partitionList {
        //ConsumePartition方法根据主题,分区和给定的偏移量创建创建了相应的分区消费者
        //如果该分区消费者已经消费了该信息将会返回error
        //sarama.OffsetNewest:表明了为最新消息
        pc, err := consumer.ConsumePartition("test", int32(partition), sarama.OffsetNewest)
        if err != nil {
            panic(err)
        }
        defer pc.AsyncClose()
        wg.Add(1)
        go func(sarama.PartitionConsumer) {
            defer wg.Done()
            //Messages()该方法返回一个消费消息类型的只读通道,由代理产生
            for msg := range pc.Messages() {
                fmt.Printf("%s---Partition:%d, Offset:%d, Key:%s, Value:%s\n", msg.Topic,msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
            }
        }(pc)
    }
    wg.Wait()
    consumer.Close()
}

kafka使用场景

  • kafka的应用很广泛,在这里简单介绍几种

    • 服务解耦

      比如我们发了一个帖子,除了写入数据库之外还有很多联动操作,比如给关注这个用户的人发送通知,推送到首页的时间线列表,如果用代码实现的话,发帖服务就要调用通知服务,时间线服务,这样的耦合很大,并且如果增加一个功能依赖发帖,除了要增加新功能外还要修改发帖代码。

      解决方法:引入kafka,将发完贴的消息放入kafka消息队列中,对这个主题感兴趣的功能就自己去消费这个消息,那么发帖功能就能够完全独立。同时即使发帖进程挂了,其他功能还能够使用,这样可以将bug隔离在最小范围内

    • 流量削峰

    流量削峰在消息队列中也是常用场景,一般在秒杀或团购活动中使用比较广泛。当流量太大的时候达到服务器瓶颈的时候可以将事件放在kafka中,下游服务器当接收到消息的时候自己去消费,有效防止服务器被挤垮

    • 消息通讯

    消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯中,比如客户端A跟客户端B都使用同一队列进行消息通讯,客户端A,客户端B,客户端N都订阅了同一个主题进行消息发布和接受不了实现类似聊天室效果

参考代码

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342