Java-JMS消息队列(ActiveMQ)

<blockquote><h4>认识消息队列</h4></blockquote>

  “消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含对象……
  消息被发送到队列中。“消息队列”是在消息的传输过程中保存消息的容器。消息队列管理器在将消息从它的源中继到它的目标时充当中间人。队列的主要目的是提供路由并保证消息的传递;如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它。

<blockquote><h4>JMS消息队列</h4></blockquote>

  Jms即[Java消息服务](http://baike.baidu.com/view/3292569.htm)(Java Message Service)[应用程序](http://baike.baidu.com/view/330120.htm)接口是一个[Java平台](http://baike.baidu.com/view/209634.htm)中关于面向[消息中间件](http://baike.baidu.com/view/3118541.htm)(MOM)的API,用于在两个应用程序之间,或[分布式系统](http://baike.baidu.com/view/991489.htm)中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供[商都](http://baike.baidu.com/view/19763.htm)对JMS提供支持。
  JMS(Java Messaging Service)是[Java](http://baike.baidu.com/view/29.htm)平台上有关面向消息中间件(MOM)的技术规范,它便于消息系统中的Java[应用程序](http://baike.baidu.com/view/330120.htm)进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化[企业](http://baike.baidu.com/view/38340.htm)应用的开发,翻译为[Java](http://baike.baidu.com/view/29.htm)消息[服务](http://baike.baidu.com/view/133203.htm)。

<blockquote><h4>JMS对象模型</h4></blockquote>

1)连接工厂。连接工厂(ConnectionFactory)是由管理员创建,并绑定到JNDI树中。客户端使用JNDI查找连接工厂,然后利用连接工厂创建一个JMS连接。
2)JMS连接。JMS连接(Connection)表示JMS客户端和服务器端之间的一个活动的连接,是由客户端通过调用连接工厂的方法建立的。   
3)JMS会话。JMS会话(Session)表示JMS客户与JMS服务器之间的会话状态。JMS会话建立在JMS连接上,表示客户与服务器之间的一个会话线程。   
4)JMS目的。JMS目的(Destination),又称为消息队列,是实际的消息源。
5)JMS生产者和消费者。生产者(Message Producer)和消费者(Message Consumer)对象由Session对象创建,用于发送和接收消息。
6)JMS消息通常有两种类型:   ① 点对点(Point-to-Point)。在点对点的消息系统中,消息分发给一个单独的使用者。点对点消息往往与队列(javax.jms.Queue)相关联。   ② 发布/订阅(Publish/Subscribe)。发布/订阅消息系统支持一个事件驱动模型,消息生产者和消费者都参与消息的传递。生产者发布事件,而使用者订阅感兴趣的事件,并使用事件。该类型消息一般与特定的主题(javax.jms.Topic)关联。
<blockquote><h4>ActiveMQ代码实现</h4></blockquote>

1)下载ActiveMQ
去官方网站下载:http://activemq.apache.org/
2)解压运行AciveMQ
解压apache-activemq-5.13.3-bin.zip文件,运行apache-activemq-5.13.3\bin\win64\activemq.bat,启动ActiveMQ,登录http://localhost:8161/admin/,创建一个Queues,命名为my-activemq
3)创建Maven项目,添加依赖POM.xml

dependencies>
<!-- activemq 相关maven依赖 -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-core</artifactId>
            <version>5.5.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.7.0</version>
        </dependency>
<!-- 日志相关依赖 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.6.1</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.6.1</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.16</version>
        </dependency>

4)创建消息发送者(生产者)Sender.java

package com.zxp.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * 发送者
 * 2016年5月6日 下午3:00:57
 * @author zhangxiaoping
 */
public class Sender {
    private static final Logger LOGGER=LoggerFactory.getLogger(Sender.class);
    //默认代理地址 "failover://tcp://localhost:61616"  服务器地址不同IP修改不同的IP
    private static final String BROKER_URL=ActiveMQConnection.DEFAULT_BROKER_URL;
    //消息队列名称 
    private static final String SUBJECT="my-activemq";
    private static int i=1;
    public static void main(String[] args) throws JMSException, InterruptedException {
        //初始化连接工厂
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(BROKER_URL);
        //建立连接
        Connection conn= connectionFactory.createConnection();
        //启动连接
        conn.start();
         //创建Session,此方法第一个参数表示会话是否在事务中执行,第二个参数设定会话的应答模式
        Session session= conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
        //创建目标队列
        Destination dest = session.createQueue(SUBJECT);
        //通过session创建消息的发送者
        MessageProducer producer=session.createProducer(dest);
        while(true){
            //定义要发送的消息
            TextMessage message= session.createTextMessage("======ActiveMQ发送消息===="+i+"===");
            LOGGER.debug(message.getText());
            //发送消息
            producer.send(message);
            //休眠2秒
            Thread.sleep(2000);
            i++;
        }
//      conn.close();
        
    }

}

5)创建消息的接收者(消费者)Receiver.java

package com.zxp.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * 接收者
 * 2016年5月6日 下午3:03:16
 * @author zhangxiaoping
 */
public class Receiver implements MessageListener{
    private static final Logger LOGGER=LoggerFactory.getLogger(Receiver.class);
    //默认代理地址 "failover://tcp://localhost:61616"  服务器地址不同IP修改不同的IP
    private static final String BROKER_URL=ActiveMQConnection.DEFAULT_BROKER_URL;
    //消息队列名称 
    private static final String SUBJECT="my-activemq";
    public static void main(String[] args) throws JMSException {
        //初始化连接工厂
        ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(BROKER_URL);
        //建立连接
        Connection conn= connectionFactory.createConnection();
        //启动连接
        conn.start();
         //创建Session,此方法第一个参数表示会话是否在事务中执行,第二个参数设定会话的应答模式
        Session session= conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //创建目标队列
        Destination dest=session.createQueue(SUBJECT);
        //通过session创建消息的接收者
        MessageConsumer consumer= session.createConsumer(dest);
        //初始化监听
        Receiver receiver=new Receiver();
        //给接收者添加监听对象
        consumer.setMessageListener(receiver);
    }
    public void onMessage(Message arg0) {
        TextMessage message=(TextMessage) arg0;
        try {
            LOGGER.debug("接收到消息"+message.getText());
            Thread.sleep(4000);
        } catch (Exception e) {
            LOGGER.error("error"+e.getMessage());
        }
        
    }
}

6)运行Sender.java、Receiver.java登录http://localhost:8161/admin/查看队列信息。

消息队列.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容

  • ActiveMQ 即时通讯服务 浅析http://www.cnblogs.com/hoojo/p/active_m...
    bboymonk阅读 1,478评论 0 11
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,580评论 18 139
  • 一、 消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合、异步消息、流量削锋等问题。实现高性能...
    步积阅读 56,841评论 10 138
  • 1、前言 之前我们通过两篇文章(架构设计:系统间通信(19)——MQ:消息协议(上)、架构设计:系统间通信(20)...
    境里婆娑阅读 1,867评论 0 4
  • 1 消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,...
    Bobby0322阅读 10,845评论 0 24