创建项目
创建 maven 项目,pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.rocketmq.test</groupId>
<artifactId>simpledemo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
代码
1. Producer
package com.rocketmq.test;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 实例化 producer,指定一个唯一的组名
DefaultMQProducer producer = new
DefaultMQProducer("please_rename_unique_group_name");
// 指定 name server 地址
producer.setNamesrvAddr("localhost:9876");
// 启动 producer
producer.start();
for (int i = 0; i < 100; i++) {
// 创建消息实例, 指定 topic, tag 和 message
Message msg = new Message("TopicTest",
"TagA",
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 发送消息,同步方式,等待返回结果
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
// 关闭
producer.shutdown();
}
}
2. Consumer
package com.rocketmq.test;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 实例化 consumer,指定 consumer 组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// 指定 name server
consumer.setNamesrvAddr("localhost:9876");
// 订阅1个或多个topic
consumer.subscribe("TopicTest", "*");
// 注册回调,当收到消息时执行
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(final List<MessageExt> msgs,
final ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动 consumer
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
运行
mvn exec:java -Dexec.mainClass="com.rocketmq.test.SyncProducer"
mvn exec:java -Dexec.mainClass="com.rocketmq.test.Consumer"