RabbitMQ(三) - 工作队列(Work Queues)

工作队列

rabbitmq-work-queues

上一个教程中,我们写了一个从一个已经命好名的队列中收发消息的程序。在这个教程中,我们将创建一个工作队列用来在多个工作者之间分发耗时(time-consuming)任务。

工作队列(又名:任务队列)背后的主要思想是避免立即做资源密集型的任务并且要等到它完成。相反,我们调度这个任务在以后完成。我们封装一个消息任务并把它发送给队列。一个工作进程在后台运行:取出任务并最终执行这个任务。如果跑了多个工作任务,那么消息被它们共享。

这些概念在web应用中是特别有用的,在很短的http请求完成一个复杂的任务。

准备

在前面的一个教程中我们发送了一个包含“Hello World!”的消息。现在我们打算发送一个字符串来代替一个复杂的任务。我们没有一个真正的任务,比如改变图片大小或者渲染一个pdf文件,我们假装我们很忙-通过使用Thread.sleep()方法。我们将以.的数量来表示任务的复杂度。每一个点表示需要“工作”1s,例如:一个假任务描述为:Hello...表示需要花费3秒的时间。

我们将简单修改一下我们之前的例子Send.java,官网的例子是用命令行,但是我们用IDE,所以不和官网的一样了。官网用命令行连发了5条消息,我们将用for循环来实现。新的程序我们命名为NewTask.java,以下是所有代码:

package com.roachfu.tutorial.rabbitmq.website.workqueues;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 工作队列例子
 */
public class NewTask {

    private static final String QUEUE_NAME = "work.queue";

    private static final String[] strings = {
            "First message.",
            "Second message..",
            "Third message...",
            "Fourth message....",
            "Fifth message....."
    };

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        String[] messages = strings;
        for (String message: messages){
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Send '" + message + "'");
        }

        channel.close();
        connection.close();
    }

}

我们老的Recv.java也是只需要稍微修改一下。只需要对.进行一个处理,我们将新的程序命名为Worker.java。以下是全部代码:

package com.roachfu.tutorial.rabbitmq.website.workqueues;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 工作队列消费端
 */
public class Worker {

    private static final String QUEUE_NAME = "work.queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");

                try {
                    doWork(message);
                }finally {
                    System.out.println(" [x] Done");
                }
            }
        };
        boolean autoAck = true;
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);

    }

    private static void doWork(String message) {
        for (char ch : message.toCharArray()){
            if (ch == '.'){
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

轮循分发(Round-robin dispatching)

任务队列的一个好处就是能很简单的并行工作。如果我们有积压的工作,我们只需要添加更多的工作者,很容易扩展。

首先,让我们同时跑两个工作者实例。它们都将从队列中获取消息,结果怎样,我们拭目以待。

你需要开三个控制台,两个跑工作者程序。这两个就是我们的消费者 - C1C2

rabbitmq-worker

第三个控制台我们发布新的任务。一旦你启动好了消费者们,你就可以发布消息了:即执行生产者程序NewTask.java。以下是执行结果:

rabbitmq-newtask

下面是当执行上面的程序之后,两个worker的输出:

worker-1

rabbitmq-worker-1

worker-2

rabbitmq-worker-2

默认的,RabbitMQ会有序的将一个个消息交付给下一个消费者。平均每个消费者将会得到相同数量的消息。这种分发消息的方式叫轮循(round-robin)。可以尝试3个或更多的工作者。

消息确认(message acknowledgment)

完成一个任务需要花费一些时间,你可以想象一个需要较长时间完成的任务在执行的中途中挂了会发生什么。我们当前的代码,一旦RabbitMQ将消息交付给客户,它将立即从内存中被移除。在这个例子中,如果你在执行过程中杀死一个工作者我们将丢失这个消息。我们也会丢失所有分发给指定的这个工作者但是还没有处理的消息。

但是我们不想丢失任何任务。如果一个工作者挂了,我们希望这个任务能交付给另一个工作者。

为了确保消息永远不会丢失,RabbitMQ提供了消息确认机制,消费者向RabbitMQ中发送一个确认表示消息已经接收、处理并且RabbitMQ可以自由的删除它了。

如果一个消费者挂了(通道关闭,连接关闭或者TCP连接丢失)没有发送确认,RabbitMQ将会理解成消息没有被完全处理并将消息发回队列。如果这个时候有其他消费者在线,它将被快速的交付给另一个消费者。通过这种方式能确保没有消息丢失,即使工作者偶尔挂掉。

这里没有消息超时,当消费者挂了RabbitMQ将会重新交付消息。如果一个消息的处理过程花费了很长很长的时间,这个是允许的。

消息确认默认是打开的,在上一个例子中我们可以通过设置标志autoAck=true将其显示的关掉。这里我们将标志设置为false并当我们完成任务后发送一个确认。以下是修改后的代码片段:

//只接受一个未确认的消息(见下文)
channel.basicQos(1);

Consumer consumer = new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");

        try {
            doWork(message);
        }finally {
            System.out.println(" [x] Done");
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    }
};
// 自动确认设置为false
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);

使用这些代码,我们能确保即使我们在消息处理的过程中杀死消费者,也不会有消息丢失,因为所有未确认的消息在工作线程挂掉后都会被重新交付到其他消费者。

消息持久化(Message Durability)

我们已经学习如何确保即使消费者挂了,消息也不会丢失。但是如果RabbitMQ服务挂了,我们还是会丢失我们的消息。

当RabbitMQ退出或者崩溃,它将忽略掉队列和消息,除非你告诉它不要忽略。两个步骤确保消息不会丢失:我们需要将队列和消息都标志为持久化的。

第一,我们需要保证RabbitMQ永远不会丢失我们的队列。为了能实现它,我们需要将其定义为持久化的。

boolean durable = true;
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

如果QUEUE_NAME这个队列名在之前就已经使用,并且没有设置为持久化,那么我们需要重新设置一个队列名,不然就会有冲突。RabbitMQ是不允许设置一个既是持久化又是非持久化的队列存在的。

上面的定义需要将消费者和生产者都改掉。

在这里即使RabbitMQ重启我们也能确保队列不会丢失。现在我们需要使我们的消息是持久化的——通过设置MessageProperties(它实现了BasicProperties)的值为PERSISTENT_TEXT_PLAIN

import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));

注意:消息持久化

设置消息持久化并不能完全保证消息不会丢失。即使告知RabbitMQ将消息持久化到磁盘,但是RabbitMQ还是会出现已经接受到消息但是却没有保存它的情况。并且,RabbitMQ不会为每个消息执行fsync(2)——它可能只是保存到内存中但是没有真的写到磁盘中。持久化并不是强持久化的,但是对于简单的任务队列已经足够使用了。如果你需要一个强持久化,你可以使用publisher confirms

公平分发

你可能已经注意到分发还是不能完全的达到我们想要的效果。例如:有这么一种情形,对于两个工作者,基数任务很繁重,偶数任务很轻松,一个工作者就会不间断的工作而另一个将几乎不做什么任务。然而RabbitMQ并不知道这些,依然在这样分发消息。

发生这种情况的原因是RabbitMQ只是单纯的当消息发送到队列后将消息进行分发。它并不关心消费者未确认的消息数量。它只是盲目的将每N个消息发送给n个消费者。

rabbitmq-prefetchcount

我们可以使用basicQos方法设置prefetchCount = 1防止这样的失败。这个将告知RabbitMQ不要在同一时间将很多消息给消费者。换句话说,在前一个消息完成并确认之前不要再将新的消息分发给这个消费者。而是将消息分发到下一个不忙的消费者。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

注意:队列大小

如果所有的消息者都在忙,你的队列又使用完了。你需要关注这个,也许需要增加更多的消费者,或者其他的策略。

以下是所有的代码整合

NewTask.java

package com.roachfu.tutorial.rabbitmq.website.workqueues;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 工作队列例子
 */
public class NewTask {

    private static final String QUEUE_NAME = "task.queue";

    private static final String[] strings = {
            "First message.",
            "Second message..",
            "Third message...",
            "Fourth message....",
            "Fifth message....."
    };

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        boolean durable = true;
        channel.queueDeclare(QUEUE_NAME, durable, false, false, null);

        String[] messages = strings;
        for (String message: messages){
            channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
            System.out.println(" [x] Send '" + message + "'");
        }

        channel.close();
        connection.close();
    }

}

Worker.java

package com.roachfu.tutorial.rabbitmq.website.workqueues;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 工作队列消费端
 */
public class Worker {

    private static final String QUEUE_NAME = "task.queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        boolean durable = true;
        channel.queueDeclare(QUEUE_NAME, durable, false, false, null);

        //只接受一个未确认的消息(见下文)
        channel.basicQos(1);

        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");

                try {
                    doWork(message);
                }finally {
                    System.out.println(" [x] Done");
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        // 自动确认设置为false
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);

    }

    private static void doWork(String message) {
        for (char ch : message.toCharArray()){
            if (ch == '.'){
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

下个教程我们将学习怎么将相同的消息交付给多个消费者。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 200,045评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,114评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 147,120评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,902评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,828评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,132评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,590评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,258评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,408评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,335评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,385评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,068评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,660评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,747评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,967评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,406评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,970评论 2 341

推荐阅读更多精彩内容

  • 来源 RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控...
    jiangmo阅读 10,339评论 2 34
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,566评论 18 139
  • RabbitMQ详解 本文地址:http://www.host900.com/index.php/articles...
    嘉加家佳七阅读 2,497评论 0 9
  • 1. 历史 RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的...
    高广超阅读 6,091评论 3 51
  • 幸福来源于内心,植根于选择。 中学时以为考上大学就轻松了,上大学时以为工作以后就自由了,谈恋爱时以为结婚以后就幸福...
    不雨萧潇阅读 378评论 0 3