go-rabbitmq

应用场景
  • 1 注册!成功后开启2个异步线程:一个发邮件,一个发短信(多个消费者)

  • 2 电商下单的过程:一个处理订单下单结算,一个处理减库存操作,一个物流发货成功。

  • 3 用在团购,秒杀活动中 (控制队列长度,当请求来了,往队列里写入,超过队列的长度,就返回失败,给用户报一个可爱的错误页的等)

  • 4 用户日志处理

  • 5 消息通讯,比如IM 客户端A--->消息队列<----客户端B取

  • 6 削峰填谷应用中:可以多个用户发送消息给MQ,然后统一由mq对接后台和DB,服务不会因为批量的访问请求,瞬间挂掉!提高稳定性!

  • 7 异步提速:比如用户在前端页面下订单的时候,订单系统要去仓库查询库存,要去核实物流状况。。。这种情况下,可以让订单系统直接把访问需求给MQ,统一由MQ来对接后端子服务系统。。。达到解耦提速的效果。服务也不至于因为请求太多而崩溃.

特点和作用
  • 1 消息队列可以处理异步请求,缓解系统的压力,从而达到解耦、削峰等目的,大大提高系统的可用性以及扩展性。

  • 2 Rabbitmq是使用Erlang语言实现AMQP协议的消息中间件,具有易用、高扩展、高可用、持久化等方面特点

模式之间的区别
  • Simple模式:生产者把消息放进队列,消费者监听这个队列,有消息了就消费掉


    image.png
  • Work模式 :生产者把消息放进队列,多个消费者同时监听同一个队列,共同争抢同一个队列里的消息,谁先抢到就谁消费


    image.png
  • Publish模式:生产者把消息放进交换机,交换机再把消息放进队列,消费者再去消费数据


    image.png
结构
  • 1底层是TCP连接,生产者是你的发送程序,创建channel(通道)向指定的exchange(交换机)发送消息,exchange再将消息路由到binding(绑定)的queue(队列)中,然后消费者(处理程序)监听接收queue中的消息进行处理。
  • 2 exchange交换机,相当于是一个消息中转控制中心,负责接收消息然后根据路由规则将消息下发到指定的queue
  • 3 Channel(通道)本质是TCP通信,利用tcp连接创建内部的逻辑连接,收发消息都是通过channel实现!
  • 4 Routing Key路由键,是exchange跟queue之间的桥梁,exchange根据绑定的routing key 下发消息到对应的queue中,决定消息的流向。
  • 5 exchange类型有: fanout, direct, topic, headers 4个类型:
    -- fanout:不需要指定路由键,直接将消息发送给exchange中的所有queue,类似于广播。
    -- direct:将消息发给exchange中指定路由键的queue中,相当于精准投放。
    -- topic:匹配模式,消息下发到匹配规则的routing key的queue中,有''与'#'两个通配符,''表示只匹配一个词,'#'表示匹配多个,比如'user.*'只能匹配到'user.name'而不能匹配到'user.name.wang','user.#'则都可以匹配到
    -- headers:根据消息体的headers匹配,这种用到的比较少,绑定的时候指定相关header参数即可
image.png

发送失败有事务回调机制

  • 重发,根据表中设定的阈值以告警
  • 最好不要单机部署消息队列,一旦挂了数据这套系统就不可用了。可分布式和集群达到容灾机制。
  • 持久化存储,以防单机队列挂了,数据丢失,消费数据的时候,要保证数据不会被重复消费。可根据唯一标识(比如消息ID)做过滤

Windows 10下安装rabbitmq

  • 1下载erlang语言(注意版本匹配):https://www.erlang.org/downloads
  • 2 设置系统环境变量名为: ERLANG_HOME ,值为文件路径: D:\Rabbitmq\erl-24.0
  • 3 设置系统变量PATH : %ERLANG_HOME%\bin,完成后可以在cmd下检验是否成功: erl
  • 4 下载rabbitmq安装包:https://www.rabbitmq.com/install-windows.html
  • 5 管理员身份进入运行命令行, 在/sbin 路径下输入(注意斜杠):.\rabbitmq-plugins enable rabbitmq_management 进行安装
  • 6 在命令行输入启动服务端:start rabbitmq-server 浏览器输入: localhost:15672 就能访问rabbitmq的客户端
消息队列怎么决定,同一时刻哪一个消费者来消费这个消息?
  • 1 RabbitMQ最简单的方式就是时间轮询策略,保证队列先进先出,同一时刻哪一个消费者来消息数据,就先给到哪一个消费者。
消息确认机制,这个机制非常耗费性能!
  • 1 RabbitMQ采用的是消息确认机制,也就是消费者取走消息之后,并且处理完了这个消息!会主动发送ACK给消息队列,消息队列在收到这个ACK之后,才可以在缓存队列里删除这个消息。
auto-ack: true // 只设置消费者就行!
  • 2 如果消费者不发送ACK给消费队列,消息一直在rabbitmq-server上面保留着,只要消费者一启动,会被再次重复消费!
  • 3 如果消息队列一直没收到ACK,上面的消息就会一直存在造成内存泄露。
消息丢失的三种场景:
  • 1 消费者真的就没有回复ACK
  • 2 消费者回复了,但是因为网络原因丢弃了。
  • 3 网络断开或者连接关掉

-- 解决方案:

  • 1 第一种是消费者代码问题,设置好auto-ack: true 就能自动回复消息给消费队列。

  • 2 第二种情况,是因为使用的底层协议有bug导致,因为在连接不断开的前提下,只要消息发出去了,TCP协议会保证消息到达对端的。RabbitMQ并没有对这种场景做处理,因为RabbitMQ并不知道,这个消息是消费者丢失了,还是网络丢失了,当然了它也不应该关心这也业务场景。不过在设计的时候,我们到可以让消费者根据自身业务,添加超时处理机制,例如:消费者在长时间得不到RabbitMQ新的消息的时候,可以尝试去重发上一个消息的ACK消息。

  • 3 第三种情况,RabbitMQ在监测到网络断开或者连接关掉的时候,会主动将这个消息再一次放回到消息队列里面,让后续消费者可以再取一次消息。在代码里写的时候可以编写一个重连机制!

如何保证消息没有被重复消费
  • 1 生产者发送消息的时候携带一个唯一的消息ID,消费者拿到消息后,根据唯一的消息ID去缓存判断,如果缓存没有同一个ID,就存到缓存然后再存DB,有的话就丢弃,避免重复消费同一个消息数据。
  • 2 如果没有写缓存,而是直接存DB ,那么可以拿消息ID去DB查询,没有就插入,有就丢弃。
RabbitMQ如何保证消费者处理的公平性?
  • 1 RabbitMQ采用公平策略做了处理,大体就是在消费者没有将分配到的消息处理完的时候,不在分配新的消息给他,这样就能够让闲一点的消费者去消息队列继续拿新的消息,而忙的消费者一心一意的处理拿到的这个大任务消息。也就是负载均衡算法一个意思!

  • 2 消费者拿到消息后,需要回复ACK给rabbitmq-server,如果一直没有回复,就后面就一直不在给这个消费者分配新的消息。

一旦RabbitMQ挂掉了,该怎么办呢?
  • 1 连接服务可以重启,但是数据呢,这里就涉及到持久化数据存储的问题: durable-true
queue, _ := mq.Channel.QueueDeclare(
        mq.QueueName,
        true, //durable 为true就是持久化存储数据,重启服务的时候会从硬盘里加载恢复
        false,
        false,
        false,
        nil,
      )
go-rabbitmq 代码示例
  • 1 涉及到断线重连,并且限制了重连次数
  • 2 采用Rabbitmq发布/订阅 模式
package models

import (
    "fmt"
    "github.com/streadway/amqp"
)

/*====采用发布/订阅模式===*/
type RabbitMq struct {
    Connection   *amqp.Connection
    Channel      *amqp.Channel
    QueueName    string
    ExchangeName string
    ExchangeType string
    RoutingKey   string
    UserName     string
    PassWord     string
    Address      string
    Port         string
}

func (mq *RabbitMq) InitRabbitmq(num int) {
    defer func() {
        if err := recover(); err != nil {
            num += 1
            mq.InitRabbitmq(num)
        }
    }()
    // 重连多次就退出!
    if num >= 8 {
        return
    }

    var err error
    mq.Connection, err = amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%s/", mq.UserName, mq.PassWord, mq.Address, mq.Port))
    if err != nil {
        fmt.Println("初始化连接rabbitmq服务出现异常:", num, err)
    }

    // 注册管道
    mq.Channel, err = mq.Connection.Channel()
    if err != nil {
        fmt.Println("创建管道失败:", err)
    }

    //监听管道channel错误
    errorChan := make(chan *amqp.Error, 1)
    notifyClose := mq.Channel.NotifyClose(errorChan)

    closeFlags := false
    for {
        select {
        case e := <-notifyClose:
            close(errorChan)
            closeFlags = true
            mq.InitRabbitmq(num)
            fmt.Println("通道错误,进行重新连接服务~", e.Error())
        default:
            closeFlags = true
        }
        if closeFlags {
            break
        }
    }

}

// 关闭RabbitMQ连接 先关闭管道,再关闭链接
func (mq *RabbitMq) closeMq() {
    if err := mq.Channel.Close(); err != nil {
        fmt.Printf("mq管道关闭失败:%s \n", err)
    }
    if err := mq.Connection.Close(); err != nil {
        fmt.Printf("mq链接关闭失败:%s \n", err)
    }
}

//2 创建生产者队列,交换机。并且按照路由规则进行绑定
func (mq *RabbitMq) Produce(message string) {
    if mq.Connection == nil{
       mq.InitRabbitmq(0)
    }
    // 队列跟交换机进行绑定,交换机模式, 参数(交换机名称,交换机类型,交换机,是否等待)
    _ = mq.Channel.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)

    queue, err := mq.Channel.QueueDeclare(
        mq.QueueName,
        true,
        false,
        false,
        false,
        nil,
    )
    if err != nil {
        fmt.Println("生产消息发生异常Produce:__", err)
        return
    }

    if err := mq.Channel.QueueBind(queue.Name, mq.RoutingKey, mq.ExchangeName, false, nil); err != nil {
        fmt.Println("绑定交换机出现异常__:", err)
    }

    if err := mq.Channel.Publish(mq.ExchangeName, mq.RoutingKey, false, false, amqp.Publishing{
        ContentType: "text/plain",
        Body:        []byte(message),
    }); err != nil {
        fmt.Println("发送消息失败: ", err)
    }
}

//消费者
func (mq *RabbitMq) Consumer() {
    queue, _ := mq.Channel.QueueDeclare(
        mq.QueueName,
        true,
        false,
        false,
        false,
        nil,
    )
    _ = mq.Channel.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, true, nil)
    messageChan, _ := mq.Channel.Consume(queue.Name, "", true, false, false, false, nil)
    c := make(chan bool)
    go func() {
        for info := range messageChan {
            fmt.Println("消费者消费的消息内容__:", string(info.Body))
        }
    }()
    <-c
}

func NewRabbitmq(queueName, exchangeName, exchangeType, userName, passWord, address, port string) RabbitMq {
    return RabbitMq{
        QueueName:    queueName,
        ExchangeName: exchangeName,
        ExchangeType: exchangeType,
        UserName:     userName,
        PassWord:     passWord,
        Address:      address,
        Port:         port,
    }
}

func Demo() {
    mq := NewRabbitmq("sosmyheart", "logs", "direct", "guest", "guest", "localhost", "5672")
    mq.InitRabbitmq(0)
    mq.Produce("你的世界独一无")
}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,607评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,047评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,496评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,405评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,400评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,479评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,883评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,535评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,743评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,544评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,612评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,309评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,881评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,891评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,136评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,783评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,316评论 2 342

推荐阅读更多精彩内容

  • 1. RabbitMQ的介绍和安装1.1 介绍1.1.1 是什么?1.1.2 使用场景1.1.3 市面上常见消息队...
    程序员Darker阅读 428评论 0 0
  • 一、RabbitMQ的特点 RabbitMQ是一款使用Erlang语言开发的,实现AMQP(高级消息队列协议)的开...
    小波同学阅读 79,491评论 9 36
  • RabbitMQ 概述 RabbitMQ是采用Erlang编程语言实现了高级消息队列协议AMQP (Advance...
    楚江云阅读 6,626评论 0 10
  • 应用场景 异步处理 场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种: 1.串行的方式 2.并行的...
    lijun_m阅读 1,805评论 0 3
  • 一、RabbitMQ介绍 RabbitMQ RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在不...
    77岁重病程序员阅读 369评论 0 0