1.引入依赖
<!-- 2.1.1对应rocketmq版本为4.7.1, 2.2.0对应rocketmq版本为:4.8.0 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
2.Rocket配置
1.yml配置
需要注意name-server是rocket部署的ip+端口
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: test
2.config
@Component
public class RocketMQConfig {
@Resource
private RocketMQTemplate mqTemplate;
@PostConstruct
public void init(){
RocketMQProducer.mqTemplate = mqTemplate;
}
}
3.消息生产者
@Slf4j
public class RocketMQProducer {
public static RocketMQTemplate mqTemplate;
/**
* 发送同步消息
* @param topic 主题
* @param body 消息
* @author tyg
* @date 2021-03-24 14:55
* @return void
*/
public static SendResult send(String topic, String body){
SendResult result = mqTemplate.syncSend(topic, MessageBuilder.withPayload(body).build());
if (result.getSendStatus() != SendStatus.SEND_OK){
// 可自行处理失败逻辑
log.error("\n=======消息发送失败,topic:{},数据:{}", topic, body);
}
return result;
}
/**
* 发送异步消息 在SendCallback中处理成功失败时的逻辑
* @param topic 主题
* @param body 消息
* @param callback 异步通知消息
* @author tyg
* @date 2021-03-24 14:55
* @return void
*/
public static void sendAsync(String topic, String body, SendCallback callback){
mqTemplate.asyncSend(topic, MessageBuilder.withPayload(body).build(), callback);
}
/**
* 发送延时消息
* 延时消息等级分为18个:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
* @param topic 主题
* @param body 消息
* @param delayLevel 延迟等级
* @author tyg
* @date 2021-03-24 14:55
* @return void
*/
public static void sendDelay(String topic, String body,SendCallback callback, Integer delayLevel){
mqTemplate.asyncSend(topic,MessageBuilder.withPayload(body).build(),callback,3000,delayLevel);
}
/**
* 发送带tag的消息,格式:topic:tag,示例:order_topic:myTag
* @param topic 主题
* @param tag tag
* @param body 消息
* @author tyg
* @date 2021-03-24 14:55
* @return void
*/
public static void sendTag(String topic, String tag, String body){
SendResult result = mqTemplate.syncSend(String.format("%s:%s", topic, tag), MessageBuilder.withPayload(body).build());
if (result.getSendStatus() != SendStatus.SEND_OK){
log.error("\n=======消息发送失败,topic:{},tag:{},数据:{}", topic, tag, body);
}
}
/**
* 单向(Oneway)发送,不可靠,可能存在丢数据的风险,建议在一些日志收集时使用
* 由于在 oneway 方式发送消息时没有请求应答处理,一旦出现消息发送失败,则会因为没有重试而导致数据丢失。若数据不可丢,建议选用可靠同步或可靠异步发送方式。
* @param topic 主题
* @param body 消息
* @author tyg
* @date 2021-03-24 14:55
* @return void
*/
public static void sendOneWay(String topic, String body){
mqTemplate.sendOneWay(topic, MessageBuilder.withPayload(body).build());
}
}
.
4.消息消费者
@Service
@RocketMQMessageListener(consumerGroup = "test", topic = "message")
public class DemoConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println(s);
}
}
5.发送普通消息和发送延迟消息
/**
* 发送普通消息
*/
@GetMapping("send")
public String send(String s){
User user = new User();
user.setId(1);
user.setName(s);
RocketMQProducer.send("message", JSON.toJSONString(user));
return "成功";
}
/**
* 发送延迟消息
* 延时消息等级分为18个:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
* delayLevel对应延迟等级就是上面的时间 共18个等级
* rocketmq只能指定延迟等级而不能自定义延迟时间,如果想自定义需要阿里巴巴提供的企业版rocketmq要收费
*/
@GetMapping("sendDelay")
public String sendDelay(){
// 延时消息等级分为18个:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
RocketMQProducer.sendDelay("message","张三",4);
return "成功";
}