ActiveMQ高级特性

连接

连接

了解BrokerUrl上能带哪些参数(参数的作用)
http://activemq.apache.org/connection-configuration-uri.html

2 Producer Send Message To MQ

2.1 Destination

https://activemq.apache.org/destination-features

2.1.1 Destination创建

我们的消息将发往哪里?
Queue 和 Topic 需要我们提前创建好吗?
https://activemq.apache.org/how-do-i-create-new-destinations
不需要,可以提前创建
https://activemq.apache.org/configure-startup-destinations

2.1.2 复合目标Composite Destinations

我们可能会有哪些发往目标需要?
发给一个Queue;发给一个Topic
会不会有需要:一个消息需发给多个queue或topic?

客户端复合目标:

https://activemq.apache.org/composite-destinations

// send to 3 queues as one logical operation 
Queue queue = new ActiveMQQueue("FOO.A,FOO.B,FOO.C"); 
producer.send(queue, someMessage);

can use a prefix of queue:// or topic:// to differentiate the type of destination

// send to queues and topic one logical operation
Queue queue = new ActiveMQQueue("FOO.A,topic://NOTIFY.FOO.A"); 
producer.send(queue, someMessage);
Broker 端复合目标 【虚拟目标】

https://activemq.apache.org/virtual-destinations

<!-- 在 activemq.xml 的broker节点下添加 --> 
<destinationInterceptors> 
  <virtualDestinationInterceptor>
    <virtualDestinations>
      <compositeQueue name="MY.QUEUE">
       <forwardTo>
         <queue physicalName="FOO" />
          <topic physicalName="BAR" /> 
      </forwardTo>
     </compositeQueue>
   </virtualDestinations>
 </virtualDestinationInterceptor>
 </destinationInterceptors>
Filter physical Destinations

虚拟目标+Selector

<destinationInterceptors>
 <virtualDestinationInterceptor>
 <virtualDestinations>
    <compositeQueue name="MY.QUEUE">
    <forwardTo>
     <filteredDestination selector="odd = 'yes'" queue="FOO"/>
     <filteredDestination selector="i = 5" topic="BAR"/>
    </forwardTo>
  </compositeQueue>
 </virtualDestinations>
 </virtualDestinationInterceptor>
 </destinationInterceptors>

2.2 发送方式

同步/异步

3 Consumer receive Message From MQ

3.1 Destination

https://activemq.apache.org/destination-features

3.1.1 从多个目标消费消息【复合目标】

从多个队列消费
订阅多个主题

// 在消费时,并不可以复合对列与主题
 new ConsumerThread("tcp://mq.study.com:61616", "queue2,topic://topic2").start(); 
// 在消费时,可以复合消费多个队列的消息 //
new ConsumerThread("tcp://mq.study.com:61616", "queue2,queue3").start(); 
// 可以订阅多个主题 
//new ConsumerThread("tcp://mq.study.com:61616", "topic1,topic2").start(); 
// 注意:如果同一条消息发给了多个队列或主题,消费者只会收到一次。
3.1.2 镜像队列

ActiveMQ每一个queue中消息只能被一个消费者消费,然而,有时候,你希望能够监视生产者和消费 者之间的消息流。即你也想获得queue中的消息(但queue不能像topic一样发给多个消费者)。为解决 这种需要,ActiveMQ提供了镜像队列功能:将队列中的消息copy一份到一个主题中,我们想监控某队 列的消息就订阅这个主题。
https://activemq.apache.org/mirrored-queues

<broker ......., useMirroredQueues="true">
    ....    
<destinationInterceptors>
        <!--prefix指定主题的前缀,默认是 VirtualTopic.Mirror
.             postfix 指定主题的后缀 默认无
        -->
        <mirroredQueue copyMessage="true" postfix=".qmirror" prefix=""/>
</destinationInterceptors> 
   ....
 </broker>

如你想获得队列 Foo.Bar 的消息,你可以订阅主题 VirtualTopic.Mirror.Foo.Bar

3.1.3 主题-集群负载均衡

Virtual Topic
https://activemq.apache.org/virtual-destinations

3.1.4 Destination Options

ActiveMQ 是采用推的模式来实时传递消息。
https://activemq.apache.org/destination-options

3.1.5 Per Destination Policies

https://activemq.apache.org/per-destination-policies

3.1.6 Prefetch

问题:ActiveMQ是如何将消息推送给消费者的?
推送一个,收到确认后,再推送下一个?
这样效率怎样?
怎样来提高效率?
提前送达一批消息给消费者处理。
ActiveMQ默认的Prefetch策略:

  • persistent queues (default value: 1000 )
  • non-persistent queues (default value: 1000 )
  • persistent topics (default value: 100 )
  • non-persistent topics (default value: Short.MAX_VALUE - 1 )
    http://activemq.apache.org/what-is-the-prefetch-limit-for
    The prefetch limit can also be configured on the connection URI used to establish a connection the broker. To change the prefetch limit for all consumer types configure a connection URI as follows:
tcp://localhost:61616?jms.prefetchPolicy.all=50

To change the prefetch limit for queue consumers only configure the connection URI as follows:

tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1

It can also be configured on a per consumer basis using Destination Options:

queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10"); 
consumer = session.createConsumer(queue);

3.2 Message Dispatch 派发

队列被多个消费者共享时,消息如何派发给消息者?
https://activemq.apache.org/dispatch-policies

3.3 Dispatch async

http://activemq.apache.org/consumer-dispatch-async

3.4 Consumer

3.4.1 消费者优先级

有时我们希望那些硬件资源充裕,网络环境好的消费者优先来接收处理消息,可以通过设置消费者的优 先级达成。
http://activemq.apache.org/consumer-priority
The priority for a consumer is set using Destination Options as follows:

queue = new ActiveMQQueue("TEST.QUEUE?consumer.priority=10"); 
consumer = session.createConsumer(queue);

The range of priority values is: 0 to 127. The highest priority is 127. The default priority is 0. The broker orders a queue’s consumers according to their priorities, dispatching messages to the highest priority consumers first. Once a particular consumer’s prefetch buffer is full the broker will start dispatching messages to the consumer with the next lowest priority whose prefetch buffer is not full.

3.4.2 严格顺序处理消息

场景:队列中的消息必须按队列中的顺序一个接一个串行处理。
这怎么实现?
队列一个消费者。
如何保证高可用,万一这个消费者挂了怎么办?
ActiveMQ 中提供了独家消费者 和 消息组 来达成。
Exclusive Consumer 独家消费者
http://activemq.apache.org/exclusive-consumer

queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");
consumer = session.createConsumer(queue);

多个消费者都以独家方式共享同一队列,只有一个会消费消息,当它挂了时,会选择下一个来发送。
message group 消息分组
消息分组是对独家消费者的增强,能负载均衡
http://activemq.apache.org/message-groups
同组的消息发给同一消费者

4 Broker delivery message 递送消息

递送模式 NON_PERSISTENT
Producer Flow Control 生产者流量控制
http://activemq.apache.org/producer-flow-control.html
对于 NON_PERSISTENT 消息,ActiveMQ默认采用 限量内存 + 限量临时文件 来限流。

  • 不进行流量控制
<destinationPolicy>
  <policyMap>
    <policyEntries>
      <policyEntry topic="FOO.>" producerFlowControl="false"/>
    </policyEntries>
  </policyMap>
 </destinationPolicy>
  • 不使用临时文件存储
<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">      
<pendingQueuePolicy>
    <vmQueueCursor/>
 </pendingQueuePolicy>
 </policyEntry>
  • 空间不够时,不阻塞而是发送异常给生产者
<systemUsage>
 <systemUsage sendFailIfNoSpace="true">
   <memoryUsage>
     <memoryUsage limit="20 mb"/>
   </memoryUsage>
 </systemUsage>
 </systemUsage>
  • 空间不够时,阻塞等待一定时间后还是没有空间再发送异常给生产者
<systemUsage>
 <systemUsage sendFailIfNoSpaceAfterTimeout="3000">
   <memoryUsage>
     <memoryUsage limit="20 mb"/>
   </memoryUsage>
 </systemUsage>
 </systemUsage>
  • 通过系统用量来限流量
<systemUsage>
  <systemUsage>
    <memoryUsage>
      <memoryUsage limit="64 mb" />
      <!-- 也可以使用百分比 -->
      <!-- <memoryUsage percentOfJvmHeap="70" />  -->
    </memoryUsage>
    <storeUsage>
      <storeUsage limit="100 gb" />
    </storeUsage>
    <tempUsage>
      <tempUsage limit="10 gb" />
    </tempUsage>
  </systemUsage>
 </systemUsage>

You can set limits of memory for NON_PERSISTENT messages, disk storage for PERSITENT messages and total usage for temporary messages, the broker will use before it slowdown producers. Using the default settings shown above, the broker will block the send() call until some messages are consumed and space becomes available on the broker. The default values are shown above, you will probably need to increase these values for your environment.

5.持久化

5.1 消息传递过程
image.png
5.2 ActiveMQ支持的持久化方式

官网介绍链接:http://activemq.apache.org/persistence

  • AMQ
  • JDBC ActiveMQ V4 加入
  • KahaDB ActiveMQ V5.3 加入,5.4开始为默认持久化方式
  • LevelDB ActiveMQ V5.8 加入 官方已废弃并不在支持 Replicated LevelDB Store
  • ActiveMQ V5.8 加入 官方已废弃并不在支持
5.2.1 配置方式

http://activemq.apache.org/schema/core/activemq-core.xsd

<xs:element name="persistenceAdapter" maxOccurs="1" minOccurs="0">    
  <xs:annotation>
        <xs:documentation>
        <![CDATA[Sets the persistence adaptor implementation to use for this broker ]]>
        </xs:documentation>
    </xs:annotation>
    <xs:complexType>
        <xs:choice maxOccurs="1" minOccurs="0">
            <xs:element ref="tns:jdbcPersistenceAdapter"/>
            <xs:element ref="tns:journalPersistenceAdapter"/>
            <xs:element ref="tns:kahaDB"/>
            <xs:element ref="tns:levelDB"/>
            <xs:element ref="tns:mKahaDB"/>
            <xs:element ref="tns:memoryPersistenceAdapter"/>
            <xs:element ref="tns:replicatedLevelDB"/>
            <xs:any namespace="##other"/>
        </xs:choice>
    </xs:complexType>
 </xs:element>
5.2.2 AMQ

基于文件存储。它具有写入速度快和容易恢复的特点,但是由于其重建索引时间过长,而且索引文件占 用磁盘空间过大,所以已经不推荐使用。
官网详细介绍:https://activemq.apache.org/amq-message-store

<broker brokerName="broker" >
    <persistenceAdapter>
      <amqPersistenceAdapter directory="${activemq.base}/activemq-data" maxFileLength="32mb"/>
    </persistenceAdapter>
  </broker>
JDBC

官网详细说明:https://activemq.apache.org/jdbc-support
对于长期的持久性,建议使用JDBC和高性能日志。如果您愿意,可以只使用JDBC,但是它非常慢。 JDBC + 高性能日志用: journalPersistenceAdapter 参考:http://activemq.apache.org/persistence
MySQL 示例

  1. 引入mysql的驱动jar,放到 activemq的 lib目录下
  2. 配置 activemq.xml
<broker ...>
        ...        
        <persistenceAdapter>
            <!-- <kahaDB directory="${activemq.data}/kahadb"/> -->            
            <jdbcPersistenceAdapter dataSource="#mysql-ds" />
        </persistenceAdapter>
        ... 
   </broker>
 <!-- MySql DataSource Sample Setup -->
     <bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
         <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
         <!-- 【注意】一定要带参数 relaxAutoCommit=true -->
        <property name="url" value="jdbc:mysql://localhost/activemq? relaxAutoCommit=true"/>
         <property name="username" value="activemq"/>
         <property name="password" value="activemq"/>
         <property name="poolPreparedStatements" value="true"/>
     </bean> 
5.2.4 KahaDB

KahaDB是一个基于文件的持久性数据库,它位于使用它的消息代理的本地。它已经为快速持久性进行 了优化。它是ActiveMQ 5.4以来的默认存储机制。KahaDB使用更少的文件描述符,并且比它的前身 AMQ消息存储提供更快的恢复。【官方推荐使用的持久化存储方式】

<persistenceAdapter>
       <kahaDB directory="${activemq.data}/kahadb"/>
 </persistenceAdapter>
image.png
db log files:以db-递增数字.log命名。
archive directory: 当配置支持archiving(默认不支持)并且存在,该文件夹才会创建。用于存储不再 需要的data logs。
db.data:存储btree索引 
db.redo:用于hard-stop broker后,btree索引的重建
5.2.5 Multi(m) kahaDB Persistence Adapter

ActiveMQ 5.6:可以跨多个kahdb持久性适配器分发目标存储。你什么时候会这么做?如果您有一个快速 的生产者/消费者目的地和另一个定期的生产者目的地,它们具有不规则的批处理消费,那么随着未消 费的消息分布在多个日志文件中,磁盘使用可能会失控。每个一个单独的日志,可以确保最小限度地使 用日志。此外,有些目的地可能很重要,需要磁盘同步,而有些则不是。在这些情况下,您可以使用 mKahaDB持久性适配器,并使用通配符过滤目的地,就像使用目的地策略一样。
Each instance of kahaDB can be configured independently. If no destination is supplied to a filteredKahaDB , the implicit default value will match any destination, queue or topic. This is a handy catch all. If no matching persistence adapter can be found, destination creation will fail with an exception. The filteredKahaDB shares its wildcard matching rules with Per Destination Policies. From ActiveMQ 5.15, filteredKahaDB support a StoreUsage attribute named usage . This allows individual disk limits to be imposed on matching queues.

<broker brokerName="broker">
 <persistenceAdapter>
  <mKahaDB directory="${activemq.base}/data/kahadb">    
    <filteredPersistenceAdapters>
      <!-- match all queues -->
      <filteredKahaDB queue=">">
        <usage>
         <storeUsage limit="1g" />
        </usage>
        <persistenceAdapter>
          <kahaDB journalMaxFileLength="32mb"/>
        </persistenceAdapter>
      </filteredKahaDB>
            <!-- match all destinations -->
      <filteredKahaDB>
        <persistenceAdapter>
          <kahaDB enableJournalDiskSyncs="false"/>
        </persistenceAdapter>
      </filteredKahaDB>
    </filteredPersistenceAdapters>
  </mKahaDB>
 </persistenceAdapter>
</broker>
Automatic Per Destination Persistence Adapter

Set perDestination="true" on the catch all, i.e., when no explicit destination is set, filteredKahaDB entry. Each matching destination will be assigned its own kahaDB instance.

<broker brokerName="broker">
 <persistenceAdapter>
  <mKahaDB directory="${activemq.base}/data/kahadb">    
  <filteredPersistenceAdapters>
      <!-- kahaDB per destinations -->
      <filteredKahaDB perDestination="true">
        <persistenceAdapter>
          <kahaDB journalMaxFileLength="32mb"/>
        </persistenceAdapter>
      </filteredKahaDB>
    </filteredPersistenceAdapters>
</mKahaDB>
 </persistenceAdapter>
</broker>

6 协议

6.1 ActiveMQ支持多种协议
6.2 协议的配置方式

http://activemq.apache.org/configuring-transports.html

<transportConnectors>
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616? maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672? maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="stomp" uri="stomp://0.0.0.0:61613? maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883? maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
            <transportConnector name="ws" uri="ws://0.0.0.0:61614? maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
6.3 OpenWire

OpenWire 是Apache的一种跨语言的协议,允许从不同的语言和平台访问ActiveMQ,是ActiveMQ 4.x 以后的版本默认的传输协议。
openwire简介:http://activemq.apache.org/openwire
openwire协议说明:http://activemq.apache.org/openwire-version-2-specification.html

6.4 AMQP

服务端配置说明:http://activemq.apache.org/amqp
客户端使用:

  1. 引入AQMP实现客户端
        <dependency>
            <groupId>org.apache.qpid</groupId>
            <artifactId>qpid-jms-client</artifactId>
            <version>0.37.0</version>
        </dependency>
  1. 连接工厂换成如下的 JmsConnectionFactory
// 1、创建连接工厂                
JmsConnectionFactory connectionFactory = new JmsConnectionFactory(null, null, brokerUrl);
  1. 其他步骤不变。
6.5 MQTT

MQTT(Message Queuing Telemetry Transport)消息队列遥测传输是IBM开发的一个即时通讯协 议,已成为物联网通信的标准。
服务端配置说明:http://activemq.apache.org/mqtt

客户端使用:

  1. 引入mqtt客户端jar
       <dependency>
            <groupId>org.fusesource.mqtt-client</groupId>
            <artifactId>mqtt-client</artifactId>
            <version>1.15</version>
        </dependency>
6.6 AUTO

Starting with version 5.13.0, ActiveMQ supports wire format protocol detection. OpenWire, STOMP, AMQP, and MQTT can be automatically detected. This allows one transport to be shared for all 4 types of clients. 从 ActiveMQ 5.13.0开始,ActiveMQ 开始支持协议格式检测,可以自动检测OpenWire、STOMP、 AMQP和MQTT。允许这4种类型的客户端共享一个传输。
官网说明:http://activemq.apache.org/auto

7 安全

ActiveMQ默认并未开启安全访问控制。支持对Queue、Topic的认证、鉴权。 https://activemq.apache.org/security 认证 简单认证方式
在activemq.xml的broker中配置简单认证的插件及用户。

image.png

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(username,password,url);
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,236评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,867评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,715评论 0 340
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,899评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,895评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,733评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,085评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,722评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,025评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,696评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,816评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,447评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,057评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,009评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,254评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,204评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,561评论 2 343

推荐阅读更多精彩内容

  • ActiveMQ 即时通讯服务 浅析http://www.cnblogs.com/hoojo/p/active_m...
    bboymonk阅读 1,479评论 0 11
  • 一、ActiveMQ的安装 1.什么是ActiveMQ? ActiveMQ 是 Apache 出品,最流行的,能力...
    开源oo柒阅读 340评论 0 6
  • 1.简介 1.1. ActiveMQ 由Apache出品的开源消息总线。是一个完全支持JMS1.1和J2EE ...
    华木公子阅读 2,284评论 0 0
  • 书韵 我从未见过海洋, 但我知道波涛汹涌的模样。 我从未去过远方, 但我早已领略祖国的大好河山。 星空...
    欢乐洋阅读 61评论 0 0
  • 姣姣经常在吃饭时间,洗澡时间里让我跟她讲点有趣的事儿,她最近对心理学很感兴趣,会要求讲心理学方面的。 这个要求没有...
    Ada李力阅读 443评论 0 51