我们在上一篇文章中已经实现了ActiveMQ的高可用集群,但是无法做到负载均衡.接下来我们通过修改配置文件实现支持负载均衡的集群.
步骤一:
基于上一篇文章的环境下,我们还是在同一台CentOS中来完成这个实验.需要创建6个ActiveMQ的节点.要实现多主多从,我们把他们分成两组(Group)
group1
Master Broker(61616)+Slave Borker(61617)+Slave Borker(61618)
group2
Master Broker(61619)+Slave Borker(61620)+Slave Borker(61621)
目录创建如下:
步骤二:
因为我们是在一台机器上做实验,所以不同的ActiveMQ需要有不同的端口配置,具体分配如下:
节点 | openwire端口 | amqp端口 | stomp端口 | mqtt端口 | ws端口 | admin端口 |
---|---|---|---|---|---|---|
mq1 | 61616 | 5672 | 61613 | 1883 | 61614 | 8161 |
mq2 | 61617 | 5682 | 61623 | 1903 | 61634 | 8162 |
mq3 | 61618 | 5692 | 61633 | 1923 | 61654 | 8163 |
mq4 | 61619 | 5702 | 61643 | 1933 | 61664 | 8165 |
mq5 | 61620 | 5712 | 61653 | 1943 | 61674 | 8166 |
mq6 | 61621 | 5722 | 61673 | 1963 | 61694 | 8167 |
具体配置的修改同学们就自己去修改,我在文章中就不贴出来了.
步骤三:
我们要实现的效果是,我们可以往两个Group中的Master Broker节点发送任意消息.
然后消费端连接任意一台机器都可以获取到所有的消息.
这时候我们需要在配置文件中配置,让两组的Group之间的机器是可见的.
1 ) 在group1中的所有节点需要修改activemq.xml文件,添加如下配置:
vi /usr/local/mqcluster/mq1/conf/activemq.xml
:这段配置添加在节点persistenceAdapter
前
<networkConnectors>
<networkConnector uri="static:(tcp://192.168.122.129:61619,tcp://192.168.122.129:61620,tcp://192.168.122.129:61621)" duplex="true"/>
</networkConnectors>
- 在group2中的所有节点需要修改
activemq.xml
文件,添加如下配置:
vi /usr/local/mqcluster/mq4/conf/activemq.xml
:这段配置添加在节点persistenceAdapter
前
<networkConnectors>
<networkConnector uri="static:(tcp://192.168.122.129:61616,tcp://192.168.122.129:61617,tcp://192.168.122.129:61618)" duplex="true"/>
</networkConnectors>
在group2中的所有节点还需要修改activemq.xml中的persistenceAdapter
内容:
<persistenceAdapter>
<!--<kahaDB directory="${activemq.data}/kahadb"/>-->
<replicatedLevelDB
directory="${activemq.data}/leveldb"
replicas="3"
bind="tcp://0.0.0.0:0"
zkAddress="192.168.122.129:2181"
zkPassword=""
hostname="localhost"
sync="local_disk"
zkPath="/activemq/leveldb-stores/group2"
/>
</persistenceAdapter>
修改zkPath="/activemq/leveldb-stores/group2"
,其他不用变.
好,到这一步我们所有的配置都配置好了.接下来就做测试了,测试的代码还是原来那套.
测试:
- 将6台ActiveMQ的示例启动起来.
- 使用group1的生产者发送3条消息,生产者的地址配置:
ConnectionFactory factory = new ActiveMQConnectionFactory(
"lanxw",
"lanxw",
"failover:(tcp://192.168.122.129:61616,tcp://192.168.122.129:61617,tcp://192.168.122.129:61618)?randomize=false"
);
- 使用group2的生产者发送3条消息,生产者地址配置:
ConnectionFactory factory = new ActiveMQConnectionFactory(
"lanxw",
"lanxw",
"failover:(tcp://192.168.122.129:61619,tcp://192.168.122.129:61620,tcp://192.168.122.129:61621)?randomize=false"
);
4.运行消费端,是可以把group1和group2的消息消费掉的.消费端地址配置:
ConnectionFactory factory = new ActiveMQConnectionFactory(
"lanxw",
"lanxw",
"failover:(tcp://192.168.122.129:61616,tcp://192.168.122.129:61617,tcp://192.168.122.129:61618," +
"tcp://192.168.122.129:61619,tcp://192.168.122.129:61620,tcp://192.168.122.129:61621)?randomize=false");