RocketMQ单机部署流程
1.下载源码包
地址 http://rocketmq.apache.org/docs/quick-start/
unzip rocketmq-all-4.4.0-source-release.zip
cd rocketmq-all-4.4.0/
mvn -Prelease-all -DskipTests clean install -U
cd distribution/target/apache-rocketmq
使用maven打包源码包
完成后在distribution/target/apache-rocketmq目录下找到生成的文件
2.修改linux Hosts
vi /etc/hosts
添加
172.19.24.103 rocketmq-nameserver1
172.19.24.103 rocketmq-master1
nameserver和master映射关系
3.建立RocketMQ目录
mkdir /usr/local/apache-rocketmq
cp apache-rocketmq.tar.gz /usr/local/apache-rocketmq
cd /usr/local/apache-rocketmq
tar -zxvf apache-rocketmq.tar.gz
ln -s apache-rocketmq rocketmq
4.创建存储路径
mkdir /usr/local/rocketmq/store
mkdir /usr/local/rocketmq/store/commitlog
mkdir /usr/local/rocketmq/store/consumequeue
mkdir /usr/local/rocketmq/store/index
5.修改RocketMQ配置文件
vi /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties
内容如下
集群名称
brokerClusterName=rocketmq-cluster
brokerName=broker-a
brokerId=0
namesrvAddr=rocketmq-nameserver1:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=true
autoCreateSubScriptionGroup=true
listenPosr=10911
deleteWhen=04
fileReservedTime=120
mapedFileSizeCommitLog=1073741824
mapedFileSizeConsumeQueue=300000
diskMaxUsedSpaceRatio=88
storePathRootDir=/usr/local/rocketmq/store
storePathCommitLog=/usr/local/rocketmq/store/commitLog
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
storePathIndex=/usr/local/rocketmq/store/index
storeCheckpooint=/usr/local/rocketmq/store/checkpoint
abortFile=/usr/local/rocketmq/store/abort
maxMessageSize=65536
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
6.修改日志文件路径
mkdir -p /usr/local/rocketmq/logs
cd /usr/local/rocketmq/conf
sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml
7.修改启动脚本参数(主要防止配置内存过大,本地机器内存不够用,生产另说)
vi /usr/local/rocketmq/bin/runbroker.sh
修改jvm内存为最大,最小,新生代为1g
vi /usr/local/rocketmq/bin/runserver.sh
修改jvm内存为最大,最小,新生代为1g
8.启动
cd /usr/local/rocketmq/bin/
先启动 NameServer
nohup sh mqnamesrv &
再启动 broker
nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties > /dev/null 2>&1 &
9.查看日志
broker 日志
tail -f /usr/local/rocketmq/logs/rocketmqlogs/broker.log
tail -f /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log
RocketMQ生产者使用
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new DefaultMQProducer("testQuickStart");
producer.setNamesrvAddr(Const.NAME_SERVER_ADDR);
producer.start();
for (int i = 0; i < 5; i++) {
Message message = new Message("test_quick_topic", "TagA", "Keys" + i, ("Hello RocketMQ" + i).getBytes());
SendResult res = producer.send(message);
System.out.println("消息发送结果" + res);
}
producer.shutdown();
}
}
运行代码可以发现Topic和Queue是一对多的关系,一个Topic发送多条消息会落到不同queueId的队列中去
消费端:
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_quick_consumer_name");
consumer.setNamesrvAddr(Const.NAME_SERVER_ADDR);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe("test_quick_topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
MessageExt messageExt = list.get(0);
try {
String topic = messageExt.getTopic();
String msgId = messageExt.getMsgId();
String body = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("topic " + topic + " msgId " + msgId + " body " + body);
int i = 1 / 0;
} catch (Exception e) {
e.printStackTrace();
int reconsumeTimes = messageExt.getReconsumeTimes();
System.out.println("reconsumeTimes " + reconsumeTimes);
if (reconsumeTimes == 3) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
可以看到RocketMQ会有自动重试机制,在捕获异常时RocketMQ Broker可以进行消息重发。
至此,完成了单机RocketMQ的搭建和简单生产者消费者的demo运行。