SpringBoot整合RabbitMQ,常用操作

本文介绍三种常用操作,基于spring-boot-starter-amqp依赖

  • 手动ack
  • work模式(能者多劳)
  • 消息格式转换

手动ack

消息确认模式

在amqp协议中消息确认有两种模式

  1. 自动确认模式(automatic acknowledgement model)当消息代理将消息发送给应用后立即删除

  2. 显式确认模式(explicit acknowledgement model)待应用发送一个确认回执后再删除消息

而在spring-boot-starter-amqp,spring定义了三种

  1. NONE 没有ack的意思,对应rabbitMQ的自动确认模式

  2. MANUAL 手动模式,对应rabbitMQ的显式确认模式

  3. AUTO 自动模式,对应rabbitMQ的显式确认模式

首先注意的是spring-amqp中的自动模式与rabbit中的自动模式是不一样的,其次,在spring-amqp中MANUAL 与 AUTO的关系有点类似于在spring中手动提交事务与自动提交事务的区别,一个是手动发送ack一个是在方法执行完,没有异常的情况下自动发送ack

代码实现

三个步骤

  1. 设置消费者的消息确认模式

  2. 手动确认/拒绝消息

  3. 设置消息拒绝策略

设置消费者的消息确认模式:

@Configuration
public class ListenerConfig {

   @Bean("myListenerFactory")
    public RabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory containerFactory= 
                new SimpleRabbitListenerContainerFactory();
        containerFactory.setConnectionFactory(connectionFactory);
        //设置消费者的消息确认模式
        containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return containerFactory;
    }
}

手动确认/拒绝消息:

@Component
@RabbitListener(
        containerFactory = "myListenerFactory",
        bindings = @QueueBinding(
            value = @Queue("myManualAckQueue"),
            exchange = @Exchange(value = "myManualAckExchange", type = ExchangeTypes.DIRECT),
            key = "mine.manual"))
public class MyAckListener {

    @RabbitHandler
    public void onMessage(@Payload String msg, 
                          @Headers Map<String, Object> headers, 
                          Channel channel) throws Exception{
        try {
            System.out.println(msg);
            //消息确认,(deliveryTag,multiple是否确认所有消息)
            channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG), false);
        } catch (Exception e) {
            //消息拒绝(deliveryTag,multiple,requeue拒绝后是否重新回到队列)
            channel.basicNack((Long) headers.get(AmqpHeaders.DELIVERY_TAG), false, false);
            // 拒绝一条
            // channel.basicReject();
        }
    }
}

设置消息拒绝策略:

拒绝策略是指,当消息被消费者拒绝时该如何处理,丢弃或者是重新回到队列.

在MANUAL 模式下,在拒绝消息的方法中设置

//消息拒绝(deliveryTag,multiple,requeue拒绝后是否重新回到队列)
channel.basicNack((Long) headers.get(AmqpHeaders.DELIVERY_TAG), false, false);

在AUTO 模式下可通过RabbitListenerContainerFactory或是ListenerContainer设置,如

@Bean("myListenerFactory")
    public RabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory){
        SimpleRabbitListenerContainerFactory containerFactory=
                new SimpleRabbitListenerContainerFactory();
        containerFactory.setConnectionFactory(connectionFactory);
        //自动ack
        containerFactory.setAcknowledgeMode(AcknowledgeMode.AUTO);
        //拒绝策略,true回到队列 false丢弃
        containerFactory.setDefaultRequeueRejected(false);
        return containerFactory;
    }

需要注意的是,默认的拒绝策略是回到队列,所以,如果队列只有一个消费者的话就会产生死循环

work模式-能者多劳

默认情况下,如果有多个消费者在一个队列上,消息是公平的分发给消费者的,一人一个轮着来,不考虑每个消费者之间的处理能力的差异,这可以通过设置预处理消息数(prefetchCount)缓解,或是使用work-能者多劳模式

work-能者多劳模式: 每个消费者的预处理消息数(prefetchCount)都设置为1,每个消费者消息确认都为显式确认模式,即MANUAL,或是AUTO

如下,两个消费者消费同一个queue上的消息,理论上consumer-one处理能力是consumer-two的两倍

@Component
public class WorkListener {
    private int one = 1;
    private int two = 1;

    @RabbitListener(containerFactory = "workListenerFactory",
            queuesToDeclare = @Queue("workQueue"))
    public void onMessageOne(String msg) throws InterruptedException {
        Thread.sleep(100);
        System.out.println("consumer-one 第 " + one + " 个消息 :" + msg);
        one++;
    }

    @RabbitListener(containerFactory = "workListenerFactory",
            queuesToDeclare = @Queue("workQueue"))
    public void onMessageTwo(String msg) throws InterruptedException {
        Thread.sleep(200);
        System.out.println("consumer-two 第 " + two + " 个消息 :" + msg);
        two++;
    }
}

生产者,使用了上一篇中介绍的默认交换机

    @Autowired
    private RabbitTemplate rabbitTemplate;

    private void send() {
        for (int i = 0; i < 100; i++) {
            rabbitTemplate.convertAndSend("workQueue", "this is a message");
        }
    }

执行结果如下,符合预期,两个消费者几乎同时消费完毕,且one消费的消息数是two的两倍

......
consumer-two 第 31 个消息 :this is a message
consumer-one 第 62 个消息 :this is a message
consumer-one 第 63 个消息 :this is a message
consumer-two 第 32 个消息 :this is a message
consumer-one 第 64 个消息 :this is a message
consumer-one 第 65 个消息 :this is a message
consumer-two 第 33 个消息 :this is a message
consumer-one 第 66 个消息 :this is a message
consumer-two 第 34 个消息 :this is a message

消息格式转换

rabbirMQ中的消息对应到java中对应的实体类是 org.springframework.amqp.core.Message,所以消息转换接口MessageConverter 有两个主要方法 toMessage 和 fromMessage 顾名思义,即将发送的内容与Message的互转

SimpleMessageConverter

spring中默认使用的是 SimpleMessageConverter 它的两个转化方法如下

toMessage,根据 object instanceof xxx 转化


toMessage.png

fromMessage,根据MessageProperties的ContentType转换

fromMessage.png

所以你大可以自己实现MessageConverter 接口自己转换,当然spring也提供了常用的转化,如转json,xml

Jackson2JsonMessageConverter

常用的将object与json互转

生产者

@Autowired
private RabbitTemplate rabbitTemplate;

private void send() {
    //实际项目不建议这么干,spring单例模式,
    // 所以最好自己构建一个"jasonRabbitTemplate",用的使用注入
    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    rabbitTemplate.convertAndSend("jsonQueue", new Student("zhangSan",15,"男"));
}

消费者

@Bean("jasonTemplate")
public RabbitTemplate jasonRabbitTemplate(ConnectionFactory connectionFactory) {
    Jackson2JsonMessageConverter messageConverter = 
        new Jackson2JsonMessageConverter();
    RabbitTemplate rabbitTemplate = new RabbitTemplate();
    rabbitTemplate.setConnectionFactory(connectionFactory);
    //设置转化类
    rabbitTemplate.setMessageConverter(messageConverter);
    return rabbitTemplate;
}

...

@Component
@RabbitListener(containerFactory = "jsonListenerFactory",
                queuesToDeclare = @Queue("jsonQueue"))
public class JasonListener {

    @RabbitHandler
    public void onMessage(Student student) {
        System.out.println(student);
    }
}

消息内容:

json.png

转载请注明出处
系列文章

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,524评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,869评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,813评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,210评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,085评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,117评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,533评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,219评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,487评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,582评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,362评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,218评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,589评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,899评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,176评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,503评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,707评论 2 335