测试数据:
topic为:topic_a
consume group为:group
broker为:broker_a
consumer的cid分别为:cid_0,cid_1,...
queue id分别为:0,1,...
代码测试的是,3个mq,在2/3/4个consumer情况下的mq分配情况
public class StrategyTest {
public static void allocate(AllocateMessageQueueStrategy strategy, int mqSize, int cidSize) {
List<String> cidList = new ArrayList<String>();
for (int i = 0; i < cidSize; i++) {
cidList.add("cid_" + i);
}
List<MessageQueue> mqList = new ArrayList<MessageQueue>(mqSize);
for (int i = 0; i < mqSize; i++) {
mqList.add(makeMq(i));
}
for (int i = 0; i < cidSize; i++) {
doTest(strategy, i, cidList, mqList);
}
System.out.println("-----");
}
private static void doTest(AllocateMessageQueueStrategy strategy, int currentCid, List<String> cidList, List<MessageQueue> mqList) {
List<MessageQueue> result = strategy.allocate("group","cid_" + currentCid, mqList, cidList);
List<Integer> queueIdList = new ArrayList<>(result.size());
for (MessageQueue mq : result) {
queueIdList.add(mq.getQueueId());
}
System.out.println("cid: " + currentCid + " queueId: " + queueIdList);
}
private static MessageQueue makeMq(int queueId) {
MessageQueue q = new MessageQueue();
q.setTopic("topic_a");
q.setBrokerName("broker_a");
q.setQueueId(queueId);
return q;
}
public static void main(String[] args) {
test(new AllocateMessageQueueAveragely());
test(new AllocateMessageQueueAveragelyByCircle());
}
private static void test(AllocateMessageQueueStrategy strategy) {
System.out.println(strategy.getClass().getSimpleName());
System.out.println(" ");
StrategyTest.allocate(strategy, 3, 2);
StrategyTest.allocate(strategy, 3, 3);
StrategyTest.allocate(strategy, 3, 4);
System.out.println("|||||");
}
}
结果如下面所示:
cid: 0 queueId: [0, 1]
cid: 1 queueId: [2]
-----
cid: 0 queueId: [0]
cid: 1 queueId: [1]
cid: 2 queueId: [2]
-----
cid: 0 queueId: [0]
cid: 1 queueId: [1]
cid: 2 queueId: [2]
cid: 3 queueId: []
mqadmin工具提供了一个allocateMQ子命令,通过其我们可以预览某个Topic在多个消费者分区是如何分配的,使用方式如下:
sh bin/mqadmin allocateMQ -i ip1,ip2,ip3 -t TopicA -n localhost:9876
这个工具可以将模拟分配的结果进行json格式展示。