rocketmq consumer优雅停机的探索

背景

最近在项目(一个dubbo服务)发布的时候经常报这个错

nested exception is org.apache.ibatis.exceptions.PersistenceException: ### Error querying database. Cause: org.springframework.jdbc.CannotGetJdbcConnectionException: Could not get JDBC Connection; nested exception is com.alibaba.druid.pool.DataSourceClosedException: dataSource already closed

简单看来是数据库连接已经关闭了,但还在执行逻辑。很奇怪,发布时,dubbo是会先被从zk中摘除掉,再停止服务,按道理说是不会有这种错报出来,接着看调用栈发现是rocketmq consumer中报出来的,原来rocketmq consumer线程一直在跑,并没有停止,所以在spring容器关闭的时候,还在执行的逻辑会报错。
其中consumer启动代码大致如下:

public void init() throws MQClientException {
    consumer = new DefaultMQPushConsumer(groupName);
    consumer.setNamesrvAddr(namesrvAddr);  consumer.subscribe(topic, tag);
    consumer.setConsumeMessageBatchMaxSize(getConsumeMessageBatchMaxSize());
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    consumer.setMessageModel(MessageModel.CLUSTERING);
    consumer.registerMessageListener(new MsgListener());
    if (getInstanceName() != null) {
        consumer.setInstanceName(getInstanceName());
    }
    consumer.start();

    Thread shutdownThread = new Thread(new Runnable() {
        public void run() {
            if (consumer != null) {
                consumer.shutdown();
            }
        }
    }, this.getClass().getName() + "MsgListener");
    shutdownThread.setPriority(Thread.MAX_PRIORITY);
}

...
public void destroy() {
    if (consumer != null) {
        consumer.shutdown();
    }
 }

虽然注册了一个shutdown的hook,而且设置了最高优先级,但从实际中看来是没有达到预期的效果的。
查了资料,有这么一段描述:

  • Thread.setPriority()可能根本不做任何事情,这跟你的操作系统和虚拟机版本有关
  • 线程优先级对于不同的线程调度器可能有不同的含义,可能并不是你直观的推测。特别地,优先级并不一定是指CPU的分享。在UNIX系统,优先级或多或少可以认为是CPU的分配,但Windows不是这样
  • 线程的优先级通常是全局的和局部的优先级设定的组合。Java的setPriority()方法只应用于局部的优先级。换句话说,你不能在整个可能的范围 内设定优先级。(这通常是一种保护的方式,你大概不希望鼠标指针的线程或者处理音频数据的线程被其它随机的用户线程所抢占)
  • 不同的系统有不同的线程优先级的取值范围,但是Java定义了10个级别(1-10)。这样就有可能出现几个线程在一个操作系统里有不同的优先级,在另外一个操作系统里却有相同的优先级(并因此可能有意想不到的行为)
  • 操作系统可能(并通常这么做)根据线程的优先级给线程添加一些专有的行为(例如”only give a quantum boost if the priority is below X“)。这里再重复一次,优先级的定义有部分在不同系统间有差别。
  • 大多数操作系统的线程调度器实际上执行的是在战略的角度上对线程的优先级做临时操作(例如当一个线程接收到它所等待的一个事件或者I/O),通常操作系统知道最多,试图手工控制优先级可能只会干扰这个系统。
  • 你的应用程序通常不知道有哪些其它进程运行的线程,所以对于整个系统来说,变更一个线程的优先级所带来的影响是难于预测的。例如你可能发现,你有一个预期 为偶尔在后台运行的低优先级的线程几乎没有运行,原因是一个病毒监控程序在一个稍微高一点的优先级(但仍然低于普通的优先级)上运行,并且无法预计你程序 的性能,它会根据你的客户使用的防病毒程序不同而不同。

总结一句话就是优先级高的在全局范围内不一定会优先执行。

注册的spring bean 的destroy执行顺序也有问题,他不一定是在db connect关闭前执行,也就是说两个bean的销毁并没有先后顺序,这点看下面的分析。

spring依赖调整尝试

由于整个项目是基于spring容器管理的,所以第一想到的是调整spring销毁bean的顺序,如果最先销毁consumer这个bean问题就解决了。我们看下面这个例子,有两个bean,A和B,A依赖B

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

    <bean id="a" class="depend.A" init-method="init" destroy-method="destroy" >
    </bean>
    <bean id="b" class="depend.B" init-method="init" destroy-method="destroy">
    </bean>
</beans>
package depend;

import javax.annotation.Resource;

public class A {

    @Resource
    private B b;

    public B getB() {
        return b;
    }

    public void setB(B b) {
        this.b = b;
    }

    public void init() {
        System.out.println("A init");
    }

    public void destroy() {
        System.out.println("A destroy");
    }

    public void doSomething() {
        System.out.println("A doSomething");
    }

}
package depend;

public class B {

    public void init() {
        System.out.println("B init");
    }

    public void destroy() {
        System.out.println("B destroy");
    }

    public void doSomething() {
        System.out.println("B doSomething");
    }

}
package main;

import depend.A;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class DependMain {

    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("Depend.xml");
        A a = (A) context.getBean("a");
        a.doSomething();
        context.registerShutdownHook();
    }
}

项目中也是用@Resource来注入依赖的,我们执行一下main发现可能会有如下的输出:

A init
B init
A doSomething
B destroy
A destroy

说明A在初始化时并没有在B之后,而且A也是在B销毁后才销毁的,说明了@Resource并没有处理依赖关系。
稍微修改一下xml配置文件:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

    <bean id="a" class="depend.A" init-method="init" destroy-method="destroy" depends-on="b">
    </bean>
    <bean id="b" class="depend.B" init-method="init" destroy-method="destroy">
    </bean>
</beans>

加一个depend-on,发现输出是这样的:

B init
A init
A doSomething
A destroy
B destroy

说明依赖生效了,同时我发现这样写也是可以处理依赖关系的:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

    <bean id="a" class="depend.A" init-method="init" destroy-method="destroy">
        <property name="b" ref="b"/>
    </bean>
    <bean id="b" class="depend.B" init-method="init" destroy-method="destroy">
    </bean>
</beans>

这也解释了为什么只有Dao调用db connect时报错,Dao就是用property依赖db connect的,Spring在关闭时一定是先关闭db connect,再关闭Dao,而Spring的bean在还有引用的时候是不会被回收的,所以只有db connect断开连接会报错。

拿到业务上来说,mq consumer依赖biz1Service,biz1Service依赖biz2Service,biz2Service依赖Dao,Dao依赖db connect,这要是梳理出这个depend-on实在是有点麻烦,于是放弃了这个想法。

设置环境变量尝试

找公司的java大神问了一下,他给出了一个建议是在应用启动时设置一个环境变量,假设叫MQ_RUN=1,当rocketmq消费消息时读取该变量,判断是MQ_RUN==1,成立则继续执行,否则
return ConsumeConcurrentlyStatus.RECONSUME_LATER,保证消息不丢失,当应用关闭时设置这个环境变量为MQ_RUN=0,sleep个几秒钟,再去真正关闭应用。思路大致是有了,试一下发现环境变量不是这么好设置的,因为export MQ_RUN=1定义的变量,会对自己所在的shell进程及其子进程生效,启动时好说,应用是启动脚本的子进程,MQ_RUN=1设置没问题,停止时,停止脚本是发一个信号给应用,这两个进程没有关系,设置MQ_RUN=0也就无效了。

信号尝试

由于上面的铺垫,想到了使用类似健康检查的方式来停止consumer的消费,写一个接口来设置这个环境变量,关闭应用脚本时执行一下,但是这样做会有风险,接口是对外暴露的,任何人都可以请求这个接口来关闭consumer的消费。于是又想到了信号,参考我之前的文章《如何正确地杀死你的进程》,在应用中监听一个信号,应用关闭脚本中先向应用发送一个信号,kill -x $pid,应用中监听这个信号设置MQ_RUN=0,让消费停止,再去关闭应用即可。

最后说一句

其实这个报错对线上一点都不影响,rocketmq采取的是消费ack的方式来确定消息是否消费,当报错时消息又会被塞回broker,下次会继续消费,但这个问题也是有点意思,所以记录下来,最后的解决方式我觉得还是不是很优雅,目前能想到的也只有这样,我希望的是spring能有一种严格的依赖关系,并且不需要逐个设置,只需要打开全局配置即可。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 196,099评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,473评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,229评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,570评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,427评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,335评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,737评论 3 386
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,392评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,693评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,730评论 2 312
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,512评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,349评论 3 314
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,750评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,017评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,290评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,706评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,904评论 2 335

推荐阅读更多精彩内容