producer
public class Producer1 {
public static void main(String[] agrs) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer producer = new
DefaultMQProducer("TopicTest3-produceGroup1",ACLClient.getAclRPCHook());
// Specify name server addresses.
producer.setNamesrvAddr("10.1.54.46:9876");
producer.setInstanceName("producer 1");
//Launch the instance.
producer.start();
for (int i = 0; i < 10; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicTest3" /* Topic */,
"abc" /* Tag */,"OrderID188",
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
msg.putUserProperty("coal", String.valueOf(i));
//Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
}
consumer
public class AclConsumer {
public static void main(String[] agrs) throws MQClientException {
//指定Group和ACL
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup5", ACLClient.getAclRPCHook(), new AllocateMessageQueueAveragely());
consumer.setNamesrvAddr("localhost:9876");
consumer.setInstanceName("consumer 5");
//集群订阅(MessageModel.CLUSTERING)
//广播订阅(MessageModel.BROADCASTING)
consumer.setMessageModel(MessageModel.BROADCASTING);
//CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息
//CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
//CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//可以修改每次消费消息的数量,默认设置是每次消费一条
consumer.setConsumeMessageBatchMaxSize(10);
//设置consumer所订阅的Topic和Tag,*代表全部的Tag MessageSelector.byTag
consumer.subscribe("TopicTest3", MessageSelector.bySql("coal between 2 and 7"));
//注册消费的监听
consumer.registerMessageListener(MessageListener.getInstance());
consumer.start();
System.out.println("consumer 2 is started");
}
}
ACLClient
public class ACLClient {
private static final String ACL_ACCESS_KEY = "RocketMQ2";
private static final String ACL_SECRET_KEY = "12345678";
static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));
}
private static final String TEST_ACL_ACCESS_KEY = "testKey";
private static final String TEST_ACL_SECRET_KEY = "12345678";
static RPCHook getTestAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(TEST_ACL_ACCESS_KEY,TEST_ACL_SECRET_KEY));
}
private static final String ERROR_ACL_ACCESS_KEY = "RocketMQ333";
private static final String ERROR_ACL_SECRET_KEY = "12345673333";
static RPCHook getErrorAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(ERROR_ACL_ACCESS_KEY,ERROR_ACL_SECRET_KEY));
}
}
Consumer 的 MessageListener
public class MessageListener implements MessageListenerConcurrently {
private static MessageListener instance = null;
public static MessageListener getInstance() {
if (instance == null) {
synchronized (MessageListener.class) {
if (instance == null) {
instance = new MessageListener();
}
}
}
return instance;
}
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt messageExt : msgs) {
String messageBody = new String(messageExt.getBody());
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(
new Date()) + "消费响应: msgBody : " + messageBody);//输出消息内容
}
//ACK
//CONSUME_SUCCESS 消费成功
//RECONSUME_LATER 消费失败,需要稍后重新消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
private MessageListener() {
}
}
Borker配置文件介绍
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# brokerClusterName = DefaultCluster
# brokerName = broker-a
# brokerId = 0
# deleteWhen = 04
# fileReservedTime = 48
# brokerRole = ASYNC_MASTER
# flushDiskType = ASYNC_FLUSH
# 所属集群名字
brokerClusterName = default-rocketmq-cluster
# true后可以使用SQL92
enablePropertyFilter = true
#broker名字,注意此处不同的配置文件填写的不一样
brokerName = broker-a
#0 表示 Master,>0 表示 Slave
brokerId = 0
#nameServer地址,分号分割
brokerIP1 = 192.168.6.46
namesrvAddr = 192.168.6.46:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums = 4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable = true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup = true
#Broker 对外服务的监听端口
listenPort = 10911
#删除文件时间点,默认凌晨 4点
deleteWhen = 04
#文件保留时间,默认 48 小时
fileReservedTime = 120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog = 1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue = 300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio = 88
#存储路径
storePathRootDir = D:\RocketMQ\target
#commitLog 存储路径
storePathCommitLog = D:\RocketMQ\target\commitLog
#消费队列存储路径存储路径
storePathConsumeQueue = D:\RocketMQ\target\consumequeue
#消息索引存储路径
storePathIndex = D:\RocketMQ\target\index
#checkpoint 文件存储路径
storeCheckpoint = D:\RocketMQ\target\checkpoint
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType = ASYNC_FLUSH
#checkTransactionMessageEnable=false
#abort 文件存储路径
abortFile = D:\RocketMQ\target\abort
#限制的消息大小
maxMessageSize = 65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#acl控制
aclEnable=true