ActiveMQ --- 入门篇

一、什么是MQ?

MQ,中文名字叫做消息中间件。既然是中间件,那么就说明它左边有东西,右边也有东西。那么左边是什么?右边又是什么呢?MQ在中间能干嘛呢?看看下面的例子。

1、生活中的case:
老师讲完了练习,然后对同学们说有问题的现在就过来问。然后张三李四王五赵六都有问题要问。那么他们就按顺序排队。张三需要5分钟,然后是李四8分钟,再然后才是王五10分钟,最后是赵六。这就相当于dubbo的RPC远程调用。也就是说,张三问的时候老师这个系统只能响应张三,后面的人都得等着。这样就会导致学生和老师耦合度高,而且效率低,如果问问题的学生多,越后面的人等待的时间也越长,老师还会累死。怎么优化呢?

2、优化方案:
老师会叫同学们把需要问的问题按照约定的格式在纸上写好,然后交给班长。等老师解答完当前学生的问题,就从班长那里拿出一份问题。这样一来,同学们也不用干等着,交了问题后该干嘛就干嘛去,老师也可以选择适当的时间再解答,不会被累死。

这个案例中的班长就是一个中间件,它不处理真正的逻辑,只是一个中间人。学生不直接问老师,而是通过班长,使得学生和老师解耦了;其次,学生上午交的问题,可能下午才得到老师的解答,整个过程是异步的;即便有一大群学生来问问题,这些请求也会堆积在班长那里,可以帮老师抵流量冲击,而不会影响到老师。综上:
MQ的作用:

  • 异步
  • 解耦
  • 削峰



欢迎大家关注我的公众号 javawebkf,目前正在慢慢地将简书文章搬到公众号,以后简书和公众号文章将同步更新,且简书上的付费文章在公众号上将免费。


二、activeMQ的安装

  • 首先从官网下载activeMQ (linux版本);
  • 然后解压就行了(activeMQ是java编写的,所以需要安装JDK)。

进入到bin目录,然后执行如下命令:

  • 启动:./activemq start
  • 指定xml配置文件启动:./activemq start xbean:file:/文件路径
  • 关闭:./activemq stop
  • 重启:./activemq restart

activeMQ的后台启动端口是 61616,要想查看是否启动成功,有如下几种方式:

  • ps -ef | grep activemq| grep -v grep
  • netstat -anp | grep 61616
  • lsof -i:61616

activemq还有一个图形界面,端口是 8161。首先保证你的 Linux 虚拟机和 windows 的 ip 处于同一个网段,然后确保没有被防火墙给屏蔽,在Linux 和 windows 上互 ping 一下。能 ping 通后,就在 浏览器访问 192.168.x.xx:8161, 默认的用户名和密码都是 admin。访问后可以看到如下界面:


activemq的图形界面


三、activeMQ怎么玩?

上面举了生活中的例子来说明MQ的作用,说白了就是我们先把问题发到MQ中,然后从MQ中取出消息。那么具体是发送到MQ中的什么位置呢?这个位置我们管它叫destination,即目的地。
目的地有以下两种:

  • 队列queue(点对点);
  • 主题topic(发布与订阅);



1、点对点传输:
所谓点对点传输,可以理解为发私信。你发了一条消息给你女朋友,只有你女朋友能收到。那接下来就看看怎么发消息和收消息。首先添加依赖:

 <!-- activemq-all -->
 <dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-all</artifactId>
      <version>5.15.8</version>
 </dependency>
 <dependency>
      <groupId>org.apache.xbean</groupId>
      <artifactId>xbean-spring</artifactId>
      <version>4.12</version>
 </dependency>
  • 生产消息:
public class Productor {
    private static final String URL = "tcp://192.168.0.103:61616";
    private static final String QUEUE_NAME = "queue_test";

    public static void main(String[] args) throws Exception {
        // 1. 创建factory工厂
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
        // 2. 创建connection连接
        Connection connection = factory.createConnection();
        connection.start();
        // 3. 创建session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 4. 创建目的地queue
        Queue queue = session.createQueue(QUEUE_NAME);
        MessageProducer producer = session.createProducer(queue);
        // 5. 生产消息
        for (int i = 1; i <= 3; i++) {
            TextMessage message = session.createTextMessage("queue" + 1);
            // 6. 将消息发送到MQ
            producer.send(message);
        }
        // 7. 关闭资源(顺着申请,倒着关闭)
        producer.close();
        session.close();
        connection.close();
        System.out.println("发送到MQ完成!");
    }
}

运行后,就可以在8161端口看到如下信息了:


生产消息
  • 消费消息:
public class Consumer {
    private static final String URL = "tcp://192.168.0.103:61616";
    private static final String QUEUE_NAME = "queue_test";

    public static void main(String[] args) throws Exception {
        // 1. 创建factory工厂
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
        // 2. 创建connection连接
        Connection connection = factory.createConnection();
        connection.start();
        // 3. 创建session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 4. 创建目的地queue
        Queue queue = session.createQueue(QUEUE_NAME);
        MessageConsumer consumer = session.createConsumer(queue);
        // 5. 消费消息
        while (true){
            // receive里面的参数表示超时时间
            TextMessage message = (TextMessage) consumer.receive(3000);
            if (message != null)
                System.out.println(message.getText());
            else
                break;
        }
        // 6. 关闭资源(顺着申请,倒着关闭)
        consumer.close();
        session.close();
        connection.close();
        System.out.println("3秒还没消息来,我溜了!");
    }
}

运行后,在8161端口就可以看到如下变化:


消费消息

可以看到消息队列为3,出列的也是3,说明消费完了。

  • 异步监听的方式消费消息:
    异步相对的就是同步,上面那种方式就是同步的。就是调用receive方法来接收消息,在没接收到消息或超时之前,程序将一直阻塞。在上面那段代码中,receive方法设置了3秒的超时时间,假如MQ中此刻没有消息供消费,那么程序将要在3秒后才能输出 “3秒还没消息,我溜了!” 这句话。异步就是不会阻塞,即使没收到消息,程序还是该干嘛就干嘛。异步监听方式写法如下:
TextMessage message = (TextMessage) consumer.receive();
consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if (message != null && message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("收到消息: " +  textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        System.in.read();
        // 6. 关闭资源(顺着申请,倒着关闭)
        consumer.close();
        session.close();
        connection.close();
  • 启动顺序问题:
    -- 先启动生产者,再依次启动两个消费者:
    ------- 先启动的消费者可以拿到消息,后启动的就不能消费了。结论:消息不能被重复消费。
    -- 先启动两个消费者,再启动生产者生产消息:
    ------- 结果就是两个消费者一人消费一半。
  • 小总结:

从上面生产消息和消费消息的demo中可以发现,其步骤其实和JDBC操作数据库差不多,都是先创建factory,然后通过factory创建connection连接,再创建session,最后执行操作的是session。点对点传输还有如下特点:

  • 每条消息只能有一个消费者,也就是上面说的消息不能被重复消费;
  • 消息生产者和消费者没有时间上的关联,生产消息时不用管是不是有人消费,消费者也随时可以提取消息;
  • 消息被消费后将不会再存储,用过就没了。



2、发布与订阅:
上面说了点对点,就是你跟你女朋友发微信。那么发布与订阅就是你在微信公众号发推文,凡是关注了你公众号的人都能收到消息。点对点的目的地是queue,发布与订阅的目的地是topic,每条消息可以有多个消费者;生产者和消费者有时间上的关联,订阅了某个topic,只能消费你订阅之后的消息,说简单就是,关注了你公众号的人,他不能收到在他关注你之前的消息;假如无人订阅就去生产,那就是一条废消息,没有人关注你的公众号,那么你发的推文就没有意思,就是一条废消息,所以一般会先启动消费者,再启动生产者。

关于发布与订阅,相比点对点,只需要把queue改成topic就可以了,这里就不再贴代码了。

关于topic和queue的区别,如下表所示:

  topic queue
工作模式 一对多 一对一
状态 无状态 queue数据会在mq服务器上以文件形式保存,也可配置成DB存储
完整性 如果没有订阅者,消息将被丢弃 消息不会被丢弃
处理效率 随着订阅者的增加效率会降低 由于一条消息只发给一个消费者,所以消费者再多也不会明显地影响性能


四、关于JMS

1、什么是JMS?
JMS中文名叫Java消息服务,它是一种规范,是javaEE的13种核心规范之一。关于javaEE的13种核心规范,网上一搜一大堆,这里不再赘述。JMS就是天上飞的理念,而各种MQ就是这种理念的落地实现。比如activeMQ、rocketMQ等,都要遵循JMS这个规范。


2、JMS的结构和特点:

  • JMS结构:
    • JMS Provider:实现了JMS接口和规范的消息中间件,像activeMQ、rocketMQ等
    • JMS Producer:消息生产者
    • JMS consumer:消息消费者
    • JMS message:消息
      • 消息头

        • JMSDestination:目的地,queue和topic

        • JMSDeliveryMode:分为持久和非持久模式。持久模式意味着消息即使JMS提供者出现故障,该消息并不会丢失,会在服务器恢复后再次发送;反之,非持久模式就是服务器出现故障,该消息将永久丢失。

        • JMSExpiration:消息过期时间,如果为0,表示永不过期。

        • JMSPriority:优先级,0到4是普通消息,5到9是加急消息,默认是4。

        • JMSMessageID:消息的唯一标识,由MQ生成。

      • 消息体

        • 封装的具体消息数据就是消息体

        • 消息体格式,有5种,常用的 TextMessage(String类型) 和 MapMessage(key、value形式)

        • 发送和接收的消息体类型必须对应一致

      • 消息属性

        • 是什么:一个对象的属性能干嘛?用来描述这个对象的特点嘛,消息属性也一样地理解就好了。

        • 如果需要除消息头字段以外的值,可以使用消息属性

        • 消息属性可以用来做识别/去重/重点标注等操作,设置消息属性的方法如下:

TextMessage textMessage = new session.createTextMessage("这是一条TextMessage");
// TextMessage 类型设置消息属性
textMessage.setStringProperty("property", "VIP");

在消费者中取出消息后:

textMessage.getStringProperty("property")

即可取出消息属性。
注意上面JMS结构的层级关系。

3、如何保证消息的可靠性?(面试重点)
一般要从三个角度去回答(持久性、事务、签收)。


  • 持久性:持久,是MQ挂了,消息依然存在,非持久,就是MQ挂了,消息就没了。

队列生产者的持久性:

// 这个producer是队列
 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 非持久
 producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 持久

队列设置为非持久,如果生产者将消息发送到MQ后,MQ挂了,那么这些消息就没了,即使MQ恢复正常也没了。队列设置为持久,那么消息只要还没消费就还会有。activeMQ的队列默认设置了持久,可保证消息只被传送一次和成功使用一次。


主题的持久性:
主题要设置持久,生产者和消费者的编码方式与之前都有点儿不一样,代码如下:

public class Consumer {
    private static final String URL = "tcp://192.168.x.xxx:61616";
    private static final String TOPIC_NAME = "topic_test";

    public static void main(String[] args) throws Exception {
        // 1. 创建factory工厂
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
        // 2. 创建connection连接
        Connection connection = factory.createConnection();
        connection.setClientID("张三");
        System.out.println("张三订阅");
        // 3. 创建session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 4. 订阅topic
        Topic topic = session.createTopic(TOPIC_NAME);
        TopicSubscriber subscriber = session.createDurableSubscriber(topic,"备注信息");
        // 5. 启动
        connection.start();
        // 6. 消费topic的消息
        Message message = subscriber.receive();
        while (null != message){
            TextMessage textMessage = (TextMessage) message;
            System.out.println("收到消息:" +  textMessage.getText());
            message = subscriber.receive(5000L);
        }
        // 6. 关闭资源(顺着申请,倒着关闭)
        session.close();
        connection.close();
        System.out.println("5秒还没消息来,我溜了!");
    }
}
public class Productor {
    private static final String URL = "tcp://192.168.0.103:61616";
    private static final String TOPIC_NAME = "topic_test";

    public static void main(String[] args) throws Exception {
        // 1. 创建factory工厂
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
        // 2. 创建connection连接
        Connection connection = factory.createConnection();
        // 3. 创建session
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 4. 创建目的地topic
        Topic topic = session.createTopic(TOPIC_NAME);
        MessageProducer producer = session.createProducer(topic);
        // 设置持久性
        //producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 非持久
        producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 持久

        connection.start();
        // 5. 生产消息
        for (int i = 1; i <= 3; i++) {
            TextMessage message = session.createTextMessage("queue" + i);
            // 6. 将消息发送到MQ
            producer.send(message);
        }
        // 7. 关闭资源(顺着申请,倒着关闭)
        producer.close();
        session.close();
        connection.close();
        System.out.println("发送到MQ完成!");
    }
}

主题设置了持久的话,一定要先运行一次消费者,等于向MQ注册,表示我订阅了这个主题。然后再运行生产者发送信息,此时,不论消费者是否还在线,都会接收到消息,不在线的话,下次连接的时候,会把没有收过的消息都接收下来。


  • 事务:创建session的时候要传两个参数,一个是事务,一个是签收。

生产者事务:

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

第一个参数就是表示事务,设置为false,表示只要执行了send方法,消息就进入到队列中了;如果设置为true,需要send后再执行commit,消息才会被提交到队列中。所以在session提交前,需要调commit方法,如下:

try{
  //没问题就提交事务
  session.commit();
}catch(Exception e){
  //有问题就回滚
  session.rollback();
}finally{
  producer.close();
  session.close();
}

生产者主事务,不管签收,因为消费者才需要签收嘛。生产者设置了事务,签收机制就无所谓了,只是这个方法需要传一个签收机制,其实事务设置为true后,起作用的就是事务了。


消费者事务:
如果消费者开启了事务,进行消费时而没有commit的话,MQ会认为你还没有成功消费消息,就会出现重复消费的情况,所以消费者一般不开启事务,而是以签收机制为主。


  • 签收:签收机制有四种,用得较多的是自动和手动两种方式。

消费者非事务的手动签收:

Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);

如果这个时候直接运行消费者,发现又可以重复消费消息,因为MQ不知道你已经签收消息了。所以在receive到消息后,应该手动签收,才不会重复消费,如下:

while (null != message){
      TextMessage textMessage = (TextMessage) message;
      textMessage.acknowledge(); // 手动签收
      System.out.println("收到消息:" +  textMessage.getText());
      message = subscriber.receive(5000L);
}

消费者开启事务的情况下的签收:

Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);

开启了事务,就会自动设置为自动签收,即使后面那个参数设置了手动签收,也不起作用了。所以,不需要调用acknowledge()方法进行签收。如果开启了事务,设置了手动签收,调用了acknowledge()方法,但是没有commit,还是会重复消费。


总之,在事务会话中,当一个事务被成功提交则消息被自动签收,如果事务回滚,则消息会被再次传递。非事务会话中,消息何时被确认取决于创建会话时的签收模式。

小结:不能容忍丢失消息,就用持久订阅,可以容忍丢失消息,就用非持久订阅。

五、activeMQ的broker

1、什么是broker?
broker就是嵌入式的activemq,也就是说,使用broker,只需要引入相关依赖就可以了,而不需要你本地安装activemq,类似于springboot那样内嵌tomcat。

2、怎么用?
除了之前引入的activemq-all,还需要引入如下依赖:

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.9.9.3</version>
</dependency>

然后编码:

public static void main(String[] args) throws Exception{
        BrokerService service = new BrokerService();
        service.setUseJmx(true);
        service.addConnector("tcp://localhost:61616");
        service.start();
}

运行后,就可以在控制台看到这个嵌入式的activemq已经启动了。


image.png
未完待续……
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 196,200评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,526评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,321评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,601评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,446评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,345评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,753评论 3 387
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,405评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,712评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,743评论 2 314
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,529评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,369评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,770评论 3 300
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,026评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,301评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,732评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,927评论 2 336

推荐阅读更多精彩内容