1.启动producer发送1条消息
package com.young.rocketmq.quickstart;
import com.young.rocketmq.constants.Const;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class Producer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name");
producer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
producer.start();
for (int i = 0; i < 1; i++) {
//1.创建消息
Message message = new Message(
"test_quick_topic",//主题
"TagA",//标签
"key" + i,//用户自定义的key,唯一的标识
("Hello RocketMQ" + i).getBytes()//消息内容实体(byte[])
);
//2.发送消息
SendResult sr = producer.send(message);
System.out.println("消息发出: " + sr);
}
producer.shutdown();
}
}
2.然后关掉主节点
进入到192.168.80.188服务器(主节点)里面
cd /usr/local/rocketmq/bin
sh mqshutdown broker
3.启动consumer消费
package com.young.rocketmq.quickstart;
import com.young.rocketmq.constants.Const;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class Producer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name");
producer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
producer.start();
for (int i = 0; i < 1; i++) {
//1.创建消息
Message message = new Message(
"test_quick_topic",//主题
"TagA",//标签
"key" + i,//用户自定义的key,唯一的标识
("Hello RocketMQ" + i).getBytes()//消息内容实体(byte[])
);
//2.发送消息
SendResult sr = producer.send(message);
System.out.println("消息发出: " + sr);
}
producer.shutdown();
}
}
发现控制台已经将该消息消费掉了
consumer start...
topic: test_quick_topic,tags: TagA,key: key0,msgBody: Hello RocketMQ0