应用场景
1 注册!成功后开启2个异步线程:一个发邮件,一个发短信(多个消费者)
2 电商下单的过程:一个处理订单下单结算,一个处理减库存操作,一个物流发货成功。
3 用在团购,秒杀活动中 (控制队列长度,当请求来了,往队列里写入,超过队列的长度,就返回失败,给用户报一个可爱的错误页的等)
4 用户日志处理
5 消息通讯,比如IM 客户端A--->消息队列<----客户端B取
6 削峰填谷应用中:可以多个用户发送消息给MQ,然后统一由mq对接后台和DB,服务不会因为批量的访问请求,瞬间挂掉!提高稳定性!
7 异步提速:比如用户在前端页面下订单的时候,订单系统要去仓库查询库存,要去核实物流状况。。。这种情况下,可以让订单系统直接把访问需求给MQ,统一由MQ来对接后端子服务系统。。。达到解耦提速的效果。服务也不至于因为请求太多而崩溃.
特点和作用
1 消息队列可以处理异步请求,缓解系统的压力,从而达到解耦、削峰等目的,大大提高系统的可用性以及扩展性。
2 Rabbitmq是使用Erlang语言实现AMQP协议的消息中间件,具有易用、高扩展、高可用、持久化等方面特点
模式之间的区别
-
Simple模式:生产者把消息放进队列,消费者监听这个队列,有消息了就消费掉
-
Work模式 :生产者把消息放进队列,多个消费者同时监听同一个队列,共同争抢同一个队列里的消息,谁先抢到就谁消费
-
Publish模式:生产者把消息放进交换机,交换机再把消息放进队列,消费者再去消费数据
结构
- 1底层是TCP连接,生产者是你的发送程序,创建channel(通道)向指定的exchange(交换机)发送消息,exchange再将消息路由到binding(绑定)的queue(队列)中,然后消费者(处理程序)监听接收queue中的消息进行处理。
- 2 exchange交换机,相当于是一个消息中转控制中心,负责接收消息然后根据路由规则将消息下发到指定的queue
- 3 Channel(通道)本质是TCP通信,利用tcp连接创建内部的逻辑连接,收发消息都是通过channel实现!
- 4 Routing Key路由键,是exchange跟queue之间的桥梁,exchange根据绑定的routing key 下发消息到对应的queue中,决定消息的流向。
- 5 exchange类型有: fanout, direct, topic, headers 4个类型:
-- fanout:不需要指定路由键,直接将消息发送给exchange中的所有queue,类似于广播。
-- direct:将消息发给exchange中指定路由键的queue中,相当于精准投放。
-- topic:匹配模式,消息下发到匹配规则的routing key的queue中,有''与'#'两个通配符,''表示只匹配一个词,'#'表示匹配多个,比如'user.*'只能匹配到'user.name'而不能匹配到'user.name.wang','user.#'则都可以匹配到
-- headers:根据消息体的headers匹配,这种用到的比较少,绑定的时候指定相关header参数即可
发送失败有事务回调机制
- 重发,根据表中设定的阈值以告警
- 最好不要单机部署消息队列,一旦挂了数据这套系统就不可用了。可分布式和集群达到容灾机制。
- 持久化存储,以防单机队列挂了,数据丢失,消费数据的时候,要保证数据不会被重复消费。可根据唯一标识(比如消息ID)做过滤
Windows 10下安装rabbitmq
- 1下载erlang语言(注意版本匹配):https://www.erlang.org/downloads
- 2 设置系统环境变量名为: ERLANG_HOME ,值为文件路径: D:\Rabbitmq\erl-24.0
- 3 设置系统变量PATH : %ERLANG_HOME%\bin,完成后可以在cmd下检验是否成功: erl
- 4 下载rabbitmq安装包:https://www.rabbitmq.com/install-windows.html
- 5 管理员身份进入运行命令行, 在/sbin 路径下输入(注意斜杠):.\rabbitmq-plugins enable rabbitmq_management 进行安装
- 6 在命令行输入启动服务端:start rabbitmq-server 浏览器输入: localhost:15672 就能访问rabbitmq的客户端
消息队列怎么决定,同一时刻哪一个消费者来消费这个消息?
- 1 RabbitMQ最简单的方式就是时间轮询策略,保证队列先进先出,同一时刻哪一个消费者来消息数据,就先给到哪一个消费者。
消息确认机制,这个机制非常耗费性能!
- 1 RabbitMQ采用的是消息确认机制,也就是消费者取走消息之后,并且处理完了这个消息!会主动发送ACK给消息队列,消息队列在收到这个ACK之后,才可以在缓存队列里删除这个消息。
auto-ack: true // 只设置消费者就行!
- 2 如果消费者不发送ACK给消费队列,消息一直在rabbitmq-server上面保留着,只要消费者一启动,会被再次重复消费!
- 3 如果消息队列一直没收到ACK,上面的消息就会一直存在造成内存泄露。
消息丢失的三种场景:
- 1 消费者真的就没有回复ACK
- 2 消费者回复了,但是因为网络原因丢弃了。
- 3 网络断开或者连接关掉
-- 解决方案:
1 第一种是消费者代码问题,设置好auto-ack: true 就能自动回复消息给消费队列。
2 第二种情况,是因为使用的底层协议有bug导致,因为在连接不断开的前提下,只要消息发出去了,TCP协议会保证消息到达对端的。RabbitMQ并没有对这种场景做处理,因为RabbitMQ并不知道,这个消息是消费者丢失了,还是网络丢失了,当然了它也不应该关心这也业务场景。不过在设计的时候,我们到可以让消费者根据自身业务,添加超时处理机制,例如:消费者在长时间得不到RabbitMQ新的消息的时候,可以尝试去重发上一个消息的ACK消息。
3 第三种情况,RabbitMQ在监测到网络断开或者连接关掉的时候,会主动将这个消息再一次放回到消息队列里面,让后续消费者可以再取一次消息。在代码里写的时候可以编写一个重连机制!
如何保证消息没有被重复消费
- 1 生产者发送消息的时候携带一个唯一的消息ID,消费者拿到消息后,根据唯一的消息ID去缓存判断,如果缓存没有同一个ID,就存到缓存然后再存DB,有的话就丢弃,避免重复消费同一个消息数据。
- 2 如果没有写缓存,而是直接存DB ,那么可以拿消息ID去DB查询,没有就插入,有就丢弃。
RabbitMQ如何保证消费者处理的公平性?
1 RabbitMQ采用公平策略做了处理,大体就是在消费者没有将分配到的消息处理完的时候,不在分配新的消息给他,这样就能够让闲一点的消费者去消息队列继续拿新的消息,而忙的消费者一心一意的处理拿到的这个大任务消息。也就是负载均衡算法一个意思!
2 消费者拿到消息后,需要回复ACK给rabbitmq-server,如果一直没有回复,就后面就一直不在给这个消费者分配新的消息。
一旦RabbitMQ挂掉了,该怎么办呢?
- 1 连接服务可以重启,但是数据呢,这里就涉及到持久化数据存储的问题: durable-true
queue, _ := mq.Channel.QueueDeclare(
mq.QueueName,
true, //durable 为true就是持久化存储数据,重启服务的时候会从硬盘里加载恢复
false,
false,
false,
nil,
)
go-rabbitmq 代码示例
- 1 涉及到断线重连,并且限制了重连次数
- 2 采用Rabbitmq发布/订阅 模式
package models
import (
"fmt"
"github.com/streadway/amqp"
)
/*====采用发布/订阅模式===*/
type RabbitMq struct {
Connection *amqp.Connection
Channel *amqp.Channel
QueueName string
ExchangeName string
ExchangeType string
RoutingKey string
UserName string
PassWord string
Address string
Port string
}
func (mq *RabbitMq) InitRabbitmq(num int) {
defer func() {
if err := recover(); err != nil {
num += 1
mq.InitRabbitmq(num)
}
}()
// 重连多次就退出!
if num >= 8 {
return
}
var err error
mq.Connection, err = amqp.Dial(fmt.Sprintf("amqp://%s:%s@%s:%s/", mq.UserName, mq.PassWord, mq.Address, mq.Port))
if err != nil {
fmt.Println("初始化连接rabbitmq服务出现异常:", num, err)
}
// 注册管道
mq.Channel, err = mq.Connection.Channel()
if err != nil {
fmt.Println("创建管道失败:", err)
}
//监听管道channel错误
errorChan := make(chan *amqp.Error, 1)
notifyClose := mq.Channel.NotifyClose(errorChan)
closeFlags := false
for {
select {
case e := <-notifyClose:
close(errorChan)
closeFlags = true
mq.InitRabbitmq(num)
fmt.Println("通道错误,进行重新连接服务~", e.Error())
default:
closeFlags = true
}
if closeFlags {
break
}
}
}
// 关闭RabbitMQ连接 先关闭管道,再关闭链接
func (mq *RabbitMq) closeMq() {
if err := mq.Channel.Close(); err != nil {
fmt.Printf("mq管道关闭失败:%s \n", err)
}
if err := mq.Connection.Close(); err != nil {
fmt.Printf("mq链接关闭失败:%s \n", err)
}
}
//2 创建生产者队列,交换机。并且按照路由规则进行绑定
func (mq *RabbitMq) Produce(message string) {
if mq.Connection == nil{
mq.InitRabbitmq(0)
}
// 队列跟交换机进行绑定,交换机模式, 参数(交换机名称,交换机类型,交换机,是否等待)
_ = mq.Channel.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)
queue, err := mq.Channel.QueueDeclare(
mq.QueueName,
true,
false,
false,
false,
nil,
)
if err != nil {
fmt.Println("生产消息发生异常Produce:__", err)
return
}
if err := mq.Channel.QueueBind(queue.Name, mq.RoutingKey, mq.ExchangeName, false, nil); err != nil {
fmt.Println("绑定交换机出现异常__:", err)
}
if err := mq.Channel.Publish(mq.ExchangeName, mq.RoutingKey, false, false, amqp.Publishing{
ContentType: "text/plain",
Body: []byte(message),
}); err != nil {
fmt.Println("发送消息失败: ", err)
}
}
//消费者
func (mq *RabbitMq) Consumer() {
queue, _ := mq.Channel.QueueDeclare(
mq.QueueName,
true,
false,
false,
false,
nil,
)
_ = mq.Channel.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, true, nil)
messageChan, _ := mq.Channel.Consume(queue.Name, "", true, false, false, false, nil)
c := make(chan bool)
go func() {
for info := range messageChan {
fmt.Println("消费者消费的消息内容__:", string(info.Body))
}
}()
<-c
}
func NewRabbitmq(queueName, exchangeName, exchangeType, userName, passWord, address, port string) RabbitMq {
return RabbitMq{
QueueName: queueName,
ExchangeName: exchangeName,
ExchangeType: exchangeType,
UserName: userName,
PassWord: passWord,
Address: address,
Port: port,
}
}
func Demo() {
mq := NewRabbitmq("sosmyheart", "logs", "direct", "guest", "guest", "localhost", "5672")
mq.InitRabbitmq(0)
mq.Produce("你的世界独一无")
}