MessageConverter 即消息转换器
我们在进行发送消息的时候,正常情况下消息体为二进制的数据方式进行传输,如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要用到 MessageConverter 了。
自定义常用转换器: MessageConverter, 一般来说都需要实现这个接口,然后重写以下两个方法。
toMessage: java 对象转换为 Message
fromMessage: Message 对象转换为 Java 对象
转换器类别:
json 转换器: jackson2JsonMessageConverter 可以进行 java 对象的转换功能
DefaultJackson2JavaTypeMapper 映射器:可以进行Java对象的映射关系
自定义二进制转换器: 比如图片类型、PDF、PPT、流媒体
代码示例:
代码地址: https://github.com/hmilyos/rabbitmqdemo.git rabbitmq-api 项目下
1. json 转换器
先创建一个 Java 实体 Order
public class Order {
private String id;
private String name;
private String content;
public Order() {
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public Order(String id, String name, String content) {
this.id = id;
this.name = name;
this.content = content;
}
@Override
public String toString() {
return "Order{" +
"id='" + id + '\'' +
", name='" + name + '\'' +
", content='" + content + '\'' +
'}';
}
}
接着在上一篇的 RabbitMQConfig 里面 配置支持 json 格式的转换器
@Bean //connectionFactory 也是要和最上面方法名保持一致
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue001(), queue002(), queue003()); //监听的队列
container.setConcurrentConsumers(1); //当前的消费者数量
container.setMaxConcurrentConsumers(5); // 最大的消费者数量
container.setDefaultRequeueRejected(false); //是否重回队列
container.setAcknowledgeMode(AcknowledgeMode.AUTO); //签收模式
container.setExposeListenerChannel(true);
container.setConsumerTagStrategy(new ConsumerTagStrategy() { //消费端的标签策略
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
//3 支持json格式的转换器
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
return container;
}
在委派 adapter 里面声明入参为 Map 的消费方法
public void consumeMessage(Map messageBody) {
log.info("map方法, 消息内容:" + messageBody);
}
功能就完成了,接着写个单元测试,注意 ContentType 一定要是 json !!
@Test
public void testSendJsonMessage() throws Exception {
Order order = new Order();
order.setId("001");
order.setName("test1001消息订单");
order.setContent("test1001订单描述信息");
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(order);
log.info("order 4 json: " + json);
MessageProperties messageProperties = new MessageProperties();
//这里注意一定要修改contentType为 application/json
messageProperties.setContentType("application/json");
Message message = new Message(json.getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.order", message);
}
运行单元测试,消息就被消费了
2. DefaultJackson2JavaTypeMapper 转换 Java 对象
messageContainer 修改成如下的
@Bean //connectionFactory 也是要和最上面方法名保持一致
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue001(), queue002(), queue003()); //监听的队列
container.setConcurrentConsumers(1); //当前的消费者数量
container.setMaxConcurrentConsumers(5); // 最大的消费者数量
container.setDefaultRequeueRejected(false); //是否重回队列
container.setAcknowledgeMode(AcknowledgeMode.AUTO); //签收模式
container.setExposeListenerChannel(true);
container.setConsumerTagStrategy(new ConsumerTagStrategy() { //消费端的标签策略
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
// 4 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java对象转换
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
return container;
}
public void consumeMessage(Order order) {
log.info("order对象, 消息内容, id: " + order.getId() +
", name: " + order.getName() +
", content: "+ order.getContent());
}
@Test
public void testSendJavaMessage() throws Exception {
Order order = new Order();
order.setId("1002");
order.setName("test1002消息订单");
order.setContent("test1002订单描述信息");
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(order);
log.info("order java: " + json);
MessageProperties messageProperties = new MessageProperties();
//这里注意一定要修改contentType为 application/json
messageProperties.setContentType("application/json");
//注意这里要写你的实体类路径
messageProperties.getHeaders().put("__TypeId__", "com.hmily.rabbitmqapi.spring.domain.Order");
Message message = new Message(json.getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.order", message);
}
运行单元测试
报错提示:如出现 If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
的异常提示,这是因为 Jackson 在把字节流转换为 Java 对象时发生安全提醒,粗暴的解决方式如下:
新建一个 EnableAllJackson2JavaTypeMapper 类,其继承 DefaultJackson2JavaTypeMapper 这个类,然后在这里配置允许转换哪些对象,我这是是直接允许所有。
然后在 刚才的 SimpleMessageListenerContainer
里面, new EnableAllJackson2JavaTypeMapper()
改为 new EnableAllJackson2JavaTypeMapper()
,即可。
再改进一下代码,转换 2 个 Java 对象
public class Packaged {
private String id;
private String name;
private String description;
public Packaged() {
}
public Packaged(String id, String name, String description) {
this.id = id;
this.name = name;
this.description = description;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
@Override
public String toString() {
return "Packaged{" +
"id='" + id + '\'' +
", name='" + name + '\'' +
", description='" + description + '\'' +
'}';
}
}
messageContainer 修改成如下的
@Bean //connectionFactory 也是要和最上面方法名保持一致
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue001(), queue002(), queue003()); //监听的队列
container.setConcurrentConsumers(1); //当前的消费者数量
container.setMaxConcurrentConsumers(5); // 最大的消费者数量
container.setDefaultRequeueRejected(false); //是否重回队列
container.setAcknowledgeMode(AcknowledgeMode.AUTO); //签收模式
container.setExposeListenerChannel(true);
container.setConsumerTagStrategy(new ConsumerTagStrategy() { //消费端的标签策略
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
// 5 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java对象多映射转换
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();
idClassMapping.put("order", com.hmily.rabbitmqapi.spring.domain.Order.class);
idClassMapping.put("packaged", com.hmily.rabbitmqapi.spring.domain.Packaged.class);
javaTypeMapper.setIdClassMapping(idClassMapping);
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
return container;
}
public void consumeMessage(Packaged pack) {
log.info("package对象, 消息内容, id: " + pack.getId() +
", name: " + pack.getName() +
", content: "+ pack.getDescription());
}
@Test
public void testSendMappingMessage() throws Exception {
ObjectMapper mapper = new ObjectMapper();
Order order = new Order();
order.setId("1001");
order.setName("1001订单消息");
order.setContent("1001订单描述信息");
String json1 = mapper.writeValueAsString(order);
log.info("order java: " + json1);
MessageProperties messageProperties1 = new MessageProperties();
//这里注意一定要修改contentType为 application/json
messageProperties1.setContentType("application/json");
messageProperties1.getHeaders().put("__TypeId__", "order");
Message message1 = new Message(json1.getBytes(), messageProperties1);
rabbitTemplate.send("topic001", "spring.order", message1);
Packaged pack = new Packaged();
pack.setId("1002");
pack.setName("1002包裹消息");
pack.setDescription("1002包裹描述信息");
String json2 = mapper.writeValueAsString(pack);
log.info("pack java: " + json2);
MessageProperties messageProperties2 = new MessageProperties();
//这里注意一定要修改contentType为 application/json
messageProperties2.setContentType("application/json");
messageProperties2.getHeaders().put("__TypeId__", "packaged");
Message message2 = new Message(json2.getBytes(), messageProperties2);
rabbitTemplate.send("topic001", "spring.pack", message2);
}
注意这里面就不是写类的路径了,而是写刚才起的别名了
因为junitTest的关系,它发送完就关闭了,还有一条消息没被消费
可以上管控台确认一下
这时候直接运行一下我们项目 RabbitmqApiApplication ,就把刚才剩余的那条消息消费了
3. 二进制转换器
先写转换处理
public class ImageMessageConverter implements MessageConverter {
private static final Logger log = LoggerFactory.getLogger(ImageMessageConverter.class);
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
throw new MessageConversionException(" convert error ! ");
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
log.info("-----------Image MessageConverter----------");
Object _extName = message.getMessageProperties().getHeaders().get("extName");
String extName = _extName == null ? "png" : _extName.toString();
byte[] body = message.getBody();
String fileName = UUID.randomUUID().toString();
String path = "G:/test/file/new/" + fileName + "." + extName;
File f = new File(path);
try {
Files.copy(new ByteArrayInputStream(body), f.toPath());
} catch (IOException e) {
e.printStackTrace();
}
return f;
}
}
消息接收
public void consumeMessage(File file) {
log.info("文件对象 方法, 消息内容:" + file.getName());
}
声明一个全局的转换器
@Bean //connectionFactory 也是要和最上面方法名保持一致
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue001(), queue002(), queue003()); //监听的队列
container.setConcurrentConsumers(1); //当前的消费者数量
container.setMaxConcurrentConsumers(5); // 最大的消费者数量
container.setDefaultRequeueRejected(false); //是否重回队列
container.setAcknowledgeMode(AcknowledgeMode.AUTO); //签收模式
container.setExposeListenerChannel(true);
container.setConsumerTagStrategy(new ConsumerTagStrategy() { //消费端的标签策略
@Override
public String createConsumerTag(String queue) {
return queue + "_" + UUID.randomUUID().toString();
}
});
// 6 ext convert
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
//全局的转换器:
ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter();
TextMessageConverter textConvert = new TextMessageConverter();
convert.addDelegate("text", textConvert);
convert.addDelegate("html/text", textConvert);
convert.addDelegate("xml/text", textConvert);
convert.addDelegate("text/plain", textConvert);
Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter();
convert.addDelegate("json", jsonConvert);
convert.addDelegate("application/json", jsonConvert);
ImageMessageConverter imageConverter = new ImageMessageConverter();
convert.addDelegate("image/png", imageConverter);
convert.addDelegate("image", imageConverter);
PDFMessageConverter pdfConverter = new PDFMessageConverter();
convert.addDelegate("application/pdf", pdfConverter);
adapter.setMessageConverter(convert);
container.setMessageListener(adapter);
return container;
}
编写单元测试来测试图片
@Test
public void testSendExtConverterMessage() throws Exception {
byte[] body = Files.readAllBytes(Paths.get("G:/test/file", "1001.png"));
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("image/png");
messageProperties.getHeaders().put("extName", "png");
Message message = new Message(body, messageProperties);
rabbitTemplate.send("", "image_queue", message);
}
原图片的本地文件夹路径
运行测试后,图片生成到指定的目录下
这次来试试PDF
public class PDFMessageConverter implements MessageConverter {
private static final Logger log = LoggerFactory.getLogger(PDFMessageConverter.class);
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
throw new MessageConversionException(" convert error ! ");
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
log.info("-----------PDF MessageConverter----------");
byte[] body = message.getBody();
String fileName = UUID.randomUUID().toString();
String path = "G:/test/file/new/" + fileName + ".pdf";
File f = new File(path);
try {
Files.copy(new ByteArrayInputStream(body), f.toPath());
} catch (IOException e) {
e.printStackTrace();
}
return f;
}
}
@Test
public void testSendExtConverterMessage() throws Exception {
// byte[] body = Files.readAllBytes(Paths.get("G:/test/file", "1001.png"));
// MessageProperties messageProperties = new MessageProperties();
// messageProperties.setContentType("image/png");
// messageProperties.getHeaders().put("extName", "png");
// Message message = new Message(body, messageProperties);
// rabbitTemplate.send("", "image_queue", message);
byte[] body = Files.readAllBytes(Paths.get("G:/test/file", "AliTech101_RD.pdf"));
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("application/pdf");
Message message = new Message(body, messageProperties);
rabbitTemplate.send("", "pdf_queue", message);
}
验证 PDF 的处理是否成功了。