背景
某一天,和我们配合的中台组给我们部门发了一组新的MQ配置,用于支付回调消息的接收,原来我们的某个项目已经有一个MQ,所以项目需要适配两个MQ(该项目都是作为消费者的角色)。
spring rabbitmq使用的版本是
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.1.5.RELEASE</version>
</dependency>
兼容多MQ的代码
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Slf4j
@Configuration
public class RabbitConfig1 {
@Bean(name = "connectionFactory1")
@Primary
public ConnectionFactory connectionFactory1 (
@Value("${spring.rabbitmq.host}") String host,
@Value("${spring.rabbitmq.port}") int port,
@Value("${spring.rabbitmq.username}") String username,
@Value("${spring.rabbitmq.password}") String password
) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
return connectionFactory;
}
@Bean(name = "rabbitTemplate1")
@Primary
public RabbitTemplate rabbitTemplate1 (
@Qualifier("connectionFactory1") ConnectionFactory connectionFactory
) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}
@Bean(name = "listenerContainerFactory1")
public SimpleRabbitListenerContainerFactory listenerContainerFactory1 (
SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier("connectionFactory1") ConnectionFactory connectionFactory
) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
}
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
@ConditionalOnProperty(name = "pay.callback.message.config.enable", havingValue = "true")
public class RabbitConfig2 {
@Bean(name = "connectionFactory2")
public ConnectionFactory connectionFactory2(
@Value("${pay.callback.rabbitmq.host}") String host,
@Value("${pay.callback.rabbitmq.port}") int port,
@Value("${pay.callback.rabbitmq.userName}") String userName,
@Value("${pay.callback.rabbitmq.password}") String password
) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
connectionFactory.setUsername(userName);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost("/");
return connectionFactory;
}
@Bean(name = "listenerContainerFactory2")
public SimpleRabbitListenerContainerFactory listenerContainerFactory2 (
SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier("connectionFactory2") ConnectionFactory connectionFactory
) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
factory.setDefaultRequeueRejected(false);
return factory;
}
}
测试
开发环境验证通过,发布到测试环境时,出现了以下异常
一下子就精神了,这就是臭名昭著的内存溢出
回顾以往出现内存溢出,往往有以下几种
内存溢出
堆空间溢出
java.lang.OutOfMemoryError: Java heap space
出现的原因一般是
- 数据突增。比如突然创建了大对象,超出了最大堆空间内存,可能还来不及回收,也可能根本就无法满足。
- 对象堆积。一般是程序编码有问题,导致创建的对象一直堆积在堆内存,无法被GC探测回收。
永久代溢出
java.lang.OutOfMemoryError: PermGen space
元空间溢出
java.lang.OutOfMemoryError: Metaspace
元空间的概念是在jdk1.8提出来的,用来取代以前的永久代。永久代
遇到这种问题,冷静,接着一步步校验
查看jvm启动参数
java -server -Xmx512M -Xms512M -Denv=FAT -XX:+UseCodeCacheFlushing -XX:+HeapDumpOnOutOfMemoryError -Xloggc:gc.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:-OmitStackTraceInFastThrow -jar /usr/local/application/**.jar
可以看出,启动参数限制了最大堆内存是515M,因为是测试环境,部署了很多个项目,保险起见设置的,平时也都正常。
那就是说调大最大堆内存就可以,接下来试一下把最大堆内存调整为1G。
更改启动参数,本地运行后,仍然会报错
呃。。。。
查看VisualVm
这时候打开VisualVm看看,可以看到设置的最大堆大小在1000MB,而已使用的堆内存大小才100多MB,此时能够笃定是创建了大对象而导致的内存溢出。
断点调试
这一步开始来断点,排查大对象从哪里来,此时查看报错的源码,发现确实是因为大对象的创建导致
代码在com.rabbitmq.client.impl.Frame
类中,Frame是指AMQP协议层面的通信帧。
对于Frame的理解,可以查看其它博客:https://blog.csdn.net/usagoole/article/details/83048009
从上图可以看到,输入流读取的字节数为1345270062,这时候即创建了一个大小为1345270062(1.2G)的字节数组,于是乎出现内存溢出。
至于为什么会突然读取到这么大的字节数,重新调试,我把断点打在com.rabbitmq.client.impl.SocketFrameHandler
系统有两个MQ,原有的MQ一切正常,从支付回调MQ开始,就开始报错了,所以初步怀疑是这个MQ账号的问题,或许是账号不对?没有远程登录的权限?
理解源码
Rabbitmq是基于socket连接读取的输入流,再将它转成字节数组。
先熟悉一下com.rabbitmq.client.impl.Frame
帧(Frame),AMQP协议层面的通信帧
上图从左到右依次为帧类型、通道编号、帧大小、内容、结束标记组成一个帧
从上面调试的代码可以看出,我们是打算取出payload这一段内容时,超出了长度。
再看看以下代码,
readInt()的作用是,读取四个输入字节,并做了位移运算,返回一个整型值。
一个int存储的是32位的整型数据,32bit = 4 * 1byte,即表明每次从输入流里读取4个字节的数据;
int payloadSize = is.readInt();
public final int readInt() throws IOException {
int ch1 = in.read();
int ch2 = in.read();
int ch3 = in.read();
int ch4 = in.read();
if ((ch1 | ch2 | ch3 | ch4) < 0)
throw new EOFException();
return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
}
断点可以看出,返回的整型值,也就是payload的长度,达到了1345270062,这样下一步创建byte对象的时候,就出现内存溢出的事故。
但是为什么会出现这个大对象,回过头去分析readInt()
,in.read()
将16进制的网络字节码 转为10进制的数组,正
是因为读取的数据有问题,才导致位移运算后得到一个比较大的整型值。
抓包
围绕着上面这个问题,此时需要抓个包看看,采取的是边断点边抓包的方式。
打开抓包工具,过滤器设置指定ip为MQ的host
-
先断点到111行,接着启动程序
-
当打到该断点的时候,看到帧大小比较大的时候,进入readInt()
可以看到此时读取的4个数值分别是80、47、49、46,由于是网络字节码转过来的,故转为16进制后,对应为
```
DEC:80 47 49 46
HEX:50 2F 31 2E
```
- 查看抓包
从抓包可以看到,字节码对上了,而且看到响应码为400,Bad Request!!!
这也验证了一开始提到的猜测:MQ账号有问题,于是咨询了中台组,最终发现,是因为1.0部门给的端口有问题,导致socket无法连接!
分析的过程非常有趣,虽然结果很狗血。。