链路压测是一种常见的压测手段,可以测试出系统,链路的性能瓶颈在哪。大公司基本都有根据自己的业务开发的整套链路压测的产品。但是基本没有开源出来,技术细节都是没有的,只是有文章介绍他们的场景和解决方案。本人最近也参与了一个链路压测的项目,把这个项目中的遇到的一些问题和解决写出来,希望给到有需要的人,技术方案并不复杂。
链路压测的操作方式有2中
- 可以部署一套跟生产一样的服务,然后录制线上流量到模拟环境回放
- 另一种就是直接对线上服务进行压测,但是不能污染到线上的数据
这里只讨论第二种方式涉及到的问题
- 既然是链路,就有上下游依赖的服务,就需要把压测请求一直传递下去,这叫透传
- 识别到压测请求后数据插到影子库,影子mongo(mongodb影子库动态切换),影子redis,mq 消息也要标记识别
说到mq,消息的标记识别需要在发送端跟消费端做处理,也分2种方式
- 要么让压测请求跟正常请求都进入到生产的服务器的队列,压测消息发送端加上标记,接收端加上识别。
- 另一个就是压测的消息发送放到另一个影子队列里面,跟生产的队列完全隔离开,消费端监听生产对列跟影子队列,接收的拦截里面判断是哪个队列发过来的消息,做对应处理
这里针对的都是注解方式使用的rabbitmq,不包括那些硬编码使用mq发送接收的
消息接收
@RabbitListener(queues = MqConstant.QUEUE)
public final void onMessage(Message message, Channel channel) throws Exception{
String msg = new String(message.getBody());
System.out.println("是否是压测消息: " + HeaderThreadLocal.isTestRequest());
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
消息发送
public void sendMq(){
rabbitTemplate.convertAndSend(MqConstant.EXCHANGE, MqConstant.ROUT_KEY, "message-body-" + UUID.randomUUID().toString());
}
1 先来说说第一种
消息的标记识别实际上就是对发送接收做一个拦截处理,配置2个bean,在bean 的方法里面做拦截的逻辑就可以了
@Bean(name = "rabbitListenerContainerFactory")
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener", name = "type", havingValue = "simple", matchIfMissing = true)
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//消息接收之前加拦截处理,每次接收消息都会调用,是有压测消息标记的,先存到副本变量,后续的操作数据库根据这个变量进行切换影子库
factory.setAfterReceivePostProcessors(new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
Map header = message.getMessageProperties().getHeaders();
//判断是哪个队列的消息,影子队列的话要动态切换影子库跟后续操作
String queue = message.getMessageProperties().getConsumerQueue();
if (header.containsKey("test")){
HeaderThreadLocal.setIsTestRequest(true);
}
return message;
}
});
configurer.configure(factory, connectionFactory);
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory factory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(factory);
//发送之前加一个拦截器,每次发送都会调用这个方法,方法名称已经说明了一切了
rabbitTemplate.setBeforePublishPostProcessors(new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
if (HeaderThreadLocal.isTestRequest()) {
//拦截逻辑就是如果是压测请求就加个header标记
message.getMessageProperties().getHeaders().put("test", true);
}
return message;
}
});
return rabbitTemplate;
}
RabbitTemplate 也有 setAfterReceivePostProcessors方法,但是这个方法对注解方式的接收消息是没用的,源码注释有说明只适用哪种接收消息的方式
2 接下来说说第二种
第二种也很简单,操作如下
1 在生产的mq服务器上面建好影子队列
2 用一个对应的影子 routing-key 做好影子队列与交换器的绑定
- 3 在发送消息的时候,如果是正常请求,就直接发送,压测请求就把发送的routing-key 加上一个后缀对应到影子routing-key
使用一个自定义的RabbitTemplate,重写里面的convertAndSend方法,每次调用都判断,是否要切换routing-key
public class CustomRabbitTemplate extends RabbitTemplate {
public CustomRabbitTemplate(ConnectionFactory connectionFactory) {
super(connectionFactory);
}
@Override
public void convertAndSend(String exchange,
String routingKey,
final Object object) throws AmqpException {
if (HeaderThreadLocal.isTestRequest()){
routingKey = routingKey +"-shadow";
}
super.convertAndSend(exchange, routingKey, object, (CorrelationData) null);
}
}
@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory factory) {
CustomRabbitTemplate rabbitTemplate = new CustomRabbitTemplate(factory);
return rabbitTemplate;
}
- 4 消息消费的时候加上拦截,判断是否是影子队列的标记放一个到 ThreadLocal里面,后面的操作根据这个标记来写数据库或者其他
这一步参考这个bean 的配置 SimpleRabbitListenerContainerFactory
- 5 自动配置里面解析@RabbitListener(queues = MqConstant.QUEUE)这一段的逻辑自动加上影子队列的监听
找到这个bean RabbitListenerAnnotationBeanPostProcessor 里面的这个方法 resolveQueues
private String[] resolveQueues(RabbitListener rabbitListener) {
String[] queues = rabbitListener.queues();
//修改这里面的逻辑,加上这2行代码
String oldQueue = queues[0];
queues = new String[]{oldQueue, oldQueue+"-shadow"};
QueueBinding[] bindings = rabbitListener.bindings();
return result.toArray(new String[result.size()]);
}
那个bean 是spring的代码,这个方法是私有的,很多方法都是私有的,怎么改? AOP ? 我直接简单粗暴的把
SimpleRabbitListenerContainerFactory 的所有代码弄出来到一个新的子类里面,修改这个方法的逻辑,然后注入这个bean 就行了。
@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() {
//这个类 CustomRabbitListenerAnnotationBeanPostProcessor 的代码时全部复制
//RabbitListenerAnnotationBeanPostProcessor的,只是加了2行代码
return new CustomRabbitListenerAnnotationBeanPostProcessor();
}
@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() {
return new RabbitListenerEndpointRegistry();
}
这样就可以自动加上影子队列的监听了,一顿操作下来,可以进行测试了,消息是可以动态指定发到对应的队列的