第三章----SpringBoot+RabbitMQ基于注解和代码之HelloWorld

上一章讲解了RabbitMQ的一些基础概念,包括:RabbitMQ概念、生产者(producer)、消费者(consumer)、信道(channel)、队列(queue)、交换器(exchange)(direct、fanout、topic)、绑定(binding)、路由键(routing key)、持久化(durable)等,本章开始写第一个HelloWorld程序,话不多说,直接上代码。

1. 项目搭建配置

  • SpringBoot项目搭建

  • Maven引入RabbitMQ jar

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 配置RabbitMQ
# RabbitMQ配置:IP、端口、用户名、密码、vhost
spring.rabbitmq.host=192.168.89.168
spring.rabbitmq.port=5672
spring.rabbitmq.username=fzb
spring.rabbitmq.password=fzb2019
spring.rabbitmq.virtual-host=fzb_host
  • 生成User表
CREATE TABLE `user` (
    `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
    `name` varchar(255) DEFAULT NULL COMMENT '姓名',
    `age` int(11) DEFAULT NULL COMMENT '年龄',
    `birthday` timestamp NULL DEFAULT NULL COMMENT '生日',
    `salary` decimal(10,2) DEFAULT NULL COMMENT '年薪',
    `create_date` timestamp NULL DEFAULT NULL COMMENT '创建时间',
    `update_date` timestamp NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
    PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;
INSERT INTO `user` VALUES ('1', '张三', '18', '2008-02-29 15:47:42', '5000000.00', '2008-02-29 15:47:42', '2019-04-12 14:35:24');
INSERT INTO `user` VALUES ('2', '李四', '17', '2008-02-29 15:47:42', '5000000.00', '2019-03-01 15:48:09', '2019-04-12 14:35:29');
INSERT INTO `user` VALUES ('3', '王五', '3', '2018-02-28 15:49:15', '50000000.00', '2019-03-04 09:38:09', '2019-04-12 14:35:16');

2. 基于代码消息队列示例

  • 新建配置 MQConfig.java类,@Component 把该类注册成组件, @Bean 创建交换器、队列及他们的绑定关系。
  • 声明Exchange(交换器名称,durable,autoDelete)
  • 声明Queue(队列名称,durable,autoDelete)
  • 绑定:BindingBuilder绑定队列到交换器,并设置路由键。
package com.fzb.rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;

/**
 * @Description MQ配置信息
 *
 * 基于代码的绑定交换器、队列、路由键设置
 * 1. 声明Exchange(交换器名称,durable,autoDelete)
 * 2. 声明Queue(队列名称,durable,autoDelete)
 * 3. 绑定:BindingBuilder绑定队列到交换器,并设置路由键
 * @Author jxb
 * @Date 2019-03-10 10:25:30
 */
@Component
public class MQConfig {

    /**
     * @Description 创建1:1 类型交换器(direct)
     * new DirectExchange(String,boolean,boolean)
     * new FanoutExchange(String,boolean,boolean)
     * new TopicExchange(String,boolean,boolean)
     * 1. 交换器名称
     * 2. durable 是否持久化 默认true
     * 3. autoDelete 是否自动删除 默认false
     * @Author jxb
     * @Date 2019-03-02 14:26:59
     */
    @Bean
    private DirectExchange directExchange() {
        return new DirectExchange("direct.exchange");
    }

    /**
     * @Description 创建队列
     * new Queue(String,boolean,boolean,boolean)
     * 1. 队列名称
     * 2. durable 是否持久化 默认true
     * 3. exclusive 排他队列,第一个链接消费后自动删除 默认 false
     * 4. autoDelete 是否自动删除 默认false
     * @Author jxb
     * @Date 2019-03-02 14:12:31
     */
    @Bean
    private Queue directQueue() {
        return new Queue("direct.queue");
    }

    /**
     * @Description 绑定队列、交换器、路由键
     * @Author jxb
     * @Date 2019-03-04 16:43:08
     */
    @Bean
    private Binding bindingDirect() {
        return BindingBuilder.bind(directQueue()).to(directExchange()).with("HelloWorld");
    }
}
  • 新建生产者 MQProducer.java类,@RestController 注解为一个控制类,@RequestMapping("mqProducer")
    设置访问路径。
  • @Autowired 注入RabbitTemplate工具类(SpringBoot集成RabbitMQ自带的)
  • @Autowired 注入userService访问数据库(需连接数据库访问User表)
package com.fzb.rabbitmq.producer;

import com.fzb.user.bean.User;
import com.fzb.user.service.UserService;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

/**
 * @Description 生产者
 * @Author jxb
 * @Date 2019-03-09 09:43:47
 */
@RestController
@RequestMapping("mqProducer")
public class MQProducer {

    @Autowired
    public RabbitTemplate rabbitTemplate;

    @Autowired
    public UserService userService;

    /**
     * @Description direct 1:1 类型 交换器队列 生产
     * @Author jxb
     * @Date 2019-03-09 09:56:45
     */
    @RequestMapping(value = "/directMQ", method = {RequestMethod.GET})
    public List<User> directMQ() {
        List<User> users = userService.getUserList(null);
        for (User user : users) {
            CorrelationData correlationData = new CorrelationData(String.valueOf(user.getId()));
            rabbitTemplate.convertAndSend("direct.exchange", "HelloWorld", user, correlationData);
        }
        return users;
    }

}
  • 新建消费者 MQConsumer.java类,@Component 注册为组件
package com.fzb.rabbitmq.consumer;

import com.fzb.user.bean.User;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/**
 * @Description 消费者
 * @Author jxb
 * @Date 2019-03-09 09:43:47
 */
@Component
public class MQConsumer {

    /**
     * @Description direct 1:1 类型 交换器队列 消费
     * @Author jxb
     * @Date 2019-03-09 09:58:12
     */
    @RabbitListener(queues = "direct.queue")
    public void getDirectMessage(User user) throws Exception {
        // 模拟执行任务
        Thread.sleep(1000);
        System.out.println("--jxb--MQConsumer--getDirectMessage:" + user.toString());
    }

    /**
     * @Description 配合楼上的队列,消费同一个队列,均匀分配到两个消费者
     * @Author jxb
     * @Date 2019-03-02 14:53:28
     */
    @RabbitListener(queues = "direct.queue")
    public void getDirectMessageCopy(User user) throws Exception {
        // 模拟执行任务
        Thread.sleep(1000);
        System.out.println("--jxb--MQConsumer--getDirectMessageCopy:" + user.toString());
    }

}

至此一个基础的SpringBoot集成RabbitMQ的direct类型交换器队列创建完成。推荐基于注解试编程,更直观也符合SpringBoot约定大于配置的思想,所以本文会着重介绍基于注解讲解。

3. 基于注解消息队列示例

1. direct类型
  • 注释掉 MQConfig.java的 @Component 注解
  • 生产者不做修改
  • 消费者
package com.fzb.rabbitmq.consumer;

import com.fzb.user.bean.User;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/**
 * 基于注解的绑定交换器、队列、路由键设置
 * 1. Queue配置:value=队列名称、durable=是否持久化(默认true)、exclusive=排他队列只在当前connection可用(默认false)、autoDelete=如无消息是否自动删除(默认false)
 * 2. Exchange配置:value=交换器名称、type=类型(默认direct)、durable=是否持久化(默认true)、autoDelete=如无消息是否自动删除(默认false)
 * 3. QueueBinding配置:key=路由键(string数组,支持* # 匹配),*必须匹配一个单词,#匹配0个或N个单词,用.分隔
 * 4. RabbitListener配置: bindings=Queue配置+Exchange配置+QueueBinding配置
 * 注:如果代码创建交换器等且配置绑定关系,注解只需监听队列即可,如:@RabbitListener(queues = "direct.queue")
 *
 * @Description 消费者
 * @Author jxb
 * @Date 2019-03-09 09:43:47
 */
@Component
public class MQConsumer {

    /**
     * @Description direct 1:1 类型 交换器队列 消费
     * @Author jxb
     * @Date 2019-03-09 09:58:12
     */
    @RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "direct.queue"), exchange = @Exchange(value = "direct.exchange"), key = "HelloWorld")})
    public void getDirectMessage(User user) throws Exception {
        // 模拟执行任务
        Thread.sleep(1000);
        System.out.println("--jxb--MQConsumer--getDirectMessageCopy:" + user.toString());
    }

    /**
     * @Description 配合楼上的队列,消费同一个队列,均匀分配到两个消费者
     * @Author jxb
     * @Date 2019-03-02 14:53:28
     */
    @RabbitListener(queues = "direct.queue")
    public void getDirectMessageCopy(User user) throws Exception {
        // 模拟执行任务
        Thread.sleep(1000);
        System.out.println("--jxb--MQConsumer--getDirectMessageCopy:" + user.toString());
    }
}
2. fanout类型
  • 生产者,1:N模式,所以不需要设置路由键,即使设置也会忽略
    /**
     * @Description fanout 1:n 类型 交换器队列 生产
     * @Author jxb
     * @Date 2019-03-09 09:56:45
     */
    @RequestMapping(value = "/fanoutMQ", method = {RequestMethod.GET})
    public List<User> fanoutMQ() {
        List<User> users = userService.getUserList(null);
        for (User user : users) {
            rabbitTemplate.convertAndSend("fanout.exchange", "", user.getName());
        }
        return users;
    }
  • 消费者,定义了三个消费者,可以根据同一条消息做出不同的动作
    /**
     * @Description fanout 1:n 类型 交换器队列 消费(3个)
     * @Author jxb
     * @Date 2019-03-09 09:58:12
     */
    @RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "fanout.queue.01"), exchange = @Exchange(value = "fanout.exchange", type = "fanout"))})
    public void getFanoutMessage01(String message) throws InterruptedException {
        // 模拟执行任务
        Thread.sleep(1000);
        System.out.println("--jxb--MQConsumer--getFanoutMessage01:" + "短信通知:您好," + message + "!感谢您成为FZB会员");
    }

    @RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "fanout.queue.02"), exchange = @Exchange(value = "fanout.exchange", type = "fanout"))})
    public void getFanoutMessage02(String message) throws InterruptedException {
        // 模拟执行任务
        Thread.sleep(1000);
        System.out.println("--jxb--MQConsumer--getFanoutMessage02:" + "增加积分:您好," + message + "!您的当前积分为100");
    }

    @RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "fanout.queue.03"), exchange = @Exchange(value = "fanout.exchange", type = "fanout"))})
    public void getFanoutMessage03(String message) throws InterruptedException {
        // 模拟执行任务
        Thread.sleep(1000);
        System.out.println("--jxb--MQConsumer--getFanoutMessage03:" + "通知好友:您好,您的朋友" + message + "已成为FZB会员,赶快一起互动吧");
    }
3. topic类型
  • 生产者,三种不同路由键的topic交换器,会根据规则路由到不同的队列
    /**
     * @Description topic n:1 类型 交换器队列 生产(3个)
     * @Author jxb
     * @Date 2019-03-09 09:56:45
     */
    @RequestMapping(value = "/topicMQ01", method = {RequestMethod.GET})
    public List<User> topicMQ01() {
        List<User> users = userService.getUserList(null);
        for (User user : users) {
            rabbitTemplate.convertAndSend("topic.exchange", "jd.reg.msg", user.getName());
        }
        return users;
    }

    @RequestMapping(value = "/topicMQ02", method = {RequestMethod.GET})
    public List<User> topicMQ02() {
        List<User> users = userService.getUserList(null);
        for (User user : users) {
            rabbitTemplate.convertAndSend("topic.exchange", "tm.reg.msg", user.getName());
        }
        return users;
    }

    @RequestMapping(value = "/topicMQ03", method = {RequestMethod.GET})
    public List<User> topicMQ03() {
        List<User> users = userService.getUserList(null);
        for (User user : users) {
            rabbitTemplate.convertAndSend("topic.exchange", "super.fzb.reg.msg", user.getName());
        }
        return users;
    }
  • 消费者,模糊匹配规则:“.”把路由键分成了几部分,“*”匹配一个词,“#”匹配0个或N个。

    /**
     * @Description topic n:1 类型 交换器队列 消费(普通会员注册提醒)
     * @Author jxb
     * @Date 2019-03-02 14:55:16
     */
    @RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "topic.queue.01"), exchange = @Exchange(value = "topic.exchange", type = "topic"), key = {"*.reg.msg"})})
    public void getTopicMessage01(String message) throws InterruptedException {
        // 模拟执行任务
        Thread.sleep(1000);
        System.out.println("--jxb--MQConsumer--getTopicMessage01:" + "短信通知:您好," + message + "!感谢您成为FZB会员");
    }

    /**
     * @Description topic n:1 类型 交换器队列 消费(超级会员注册提醒)
     * @Author jxb
     * @Date 2019-03-02 14:55:16
     */
    @RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "topic.queue.02"), exchange = @Exchange(value = "topic.exchange", type = "topic"), key = {"*.*.reg.msg.#"})})
    public void getTopicMessage02(String message) throws InterruptedException {
        // 模拟执行任务
        Thread.sleep(1000);
        System.out.println("--jxb--MQConsumer--getTopicMessage02:" + "短信通知:您好," + message + "!感谢您成为FZB超级会员,祝您玩的开心");
    }

direct、fanout、topic介绍完成,运行结果,自行检验。


生活好苦,但你好甜

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,602评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,442评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,878评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,306评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,330评论 5 373
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,071评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,382评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,006评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,512评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,965评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,094评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,732评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,283评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,286评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,512评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,536评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,828评论 2 345