在之前的RabbitMQ中,讲了四种交换机的规则和用法,现在我们来说说RabbitMQ中的消息回调,也就是消息确认。
消息回调
只需要实现回调配置以及回调函数即可
- 首先需要在配置文件中添加消息回调的配置项
# 消息确认配置项
# 确认消息已发送到交换机(Exchange)
# spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-confirm-type=simple
# 确认消息已经发送到队列(Queue)
spring.rabbitmq.publisher-returns=true
- 配置相关的消息回调函数
在MQ配置类中实现回调函数:
/**
* 消息确认回调函数
*
* @param connectionFactory
* @return
*/
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
// 设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
rabbitTemplate.setMandatory(true);
// 消息发送回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
System.out.println("ConfirmCallback: " + "相关数据:" + correlationData);
System.out.println("ConfirmCallback: " + "确认情况:" + ack);
System.out.println("ConfirmCallback: " + "原因:" + cause);
});
// 消息接收回调
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
System.out.println("ReturnCallback: " + "消息:" + message);
System.out.println("ReturnCallback: " + "回应码:" + replyCode);
System.out.println("ReturnCallback: " + "回应信息:" + replyText);
System.out.println("ReturnCallback: " + "交换机:" + exchange);
System.out.println("ReturnCallback: " + "路由键:" + routingKey);
});
return rabbitTemplate;
}
RabbitMQ中消息的推送包含四种情况:
情况一: 消息推送到server,但是在server里找不到交换机 , 这个交换机是没有创建没有配置的
结论: 这种情况触发的是 ConfirmCallback 回调函数。
情况二:消息推送到server,找到交换机了,但是没找到队列
结论: 这种情况触发的是 ConfirmCallback 和 RetrunCallback 两个回调函数。
情况三:消息推送到sever,交换机和队列啥都没找到 (和情况一相类似)
结论: 这种情况触发的是 ConfirmCallback 回调函数。
情况四: 消息推送成功
结论: 这种情况触发的是 ConfirmCallback 回调函数。
消息确认机制
和生产者的消息确认机制不同,消费者本来就是在监听消息,符合条件的消息就会消费下来;所以,消息接收确认机制主要有是三种:
第一种:自动确认,这也是默认的消息确认情况。 AcknowledgeMode.AUTO
RabbitMQ成功将消息发出(即将消息成功写入TCP Socket)中立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递。
所以这种情况如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。
一般这种情况我们都是使用try catch捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。
第二种:不确认,这个不做介绍。AcknowledgeMode.NONE
第三种:手动确认,使用最多的模式。AcknowledgeMode.MANUAL
basic.ack用于肯定确认
basic.nack用于否定确认(注意:这是AMQP 0-9-1的RabbitMQ扩展)
basic.reject用于否定确认,但与basic.nack相比有一个限制:一次只能拒绝单条消息
消费者端以上的3个方法都表示消息已经被正确投递,但是basic.ack表示消息已经被正确处理,但是basic.nack,basic.reject表示没有被正确处理,但是RabbitMQ中仍然需要删除这条消息。
- 新建消息确认配置类
/**
* 消费者消息确认机制处理<p>
* 代码描述<p>
* Copyright: Copyright (C) 2019 XXX, Inc. All rights reserved. <p>
*
* @author WQL
* @since 2019年10月23日 0023 14:24
*/
@Configuration
public class MessageListenerConfig {
@Autowired
private CachingConnectionFactory connectionFactory;
/**
* 自己写的Direct消息接收处理类
* 消费者类
*/
@Autowired
private Receiver receiver;
/**
* 自己写的MQ配置类(包含队列相关信息)
*/
@Autowired
private RabbitConfig rabbitConfig;
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(1);
// RabbitMQ默认是自动确认,这里改为手动确认消息
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setQueues(rabbitConfig.helloQueue());
container.setMessageListener(receiver);
return container;
}
}
- 消费者消息确认代码
@Component
@RabbitListener(queues = "hello")
public class Receiver implements ChannelAwareMessageListener {
/**
* 该消费者者实现了对hello队列的消费
* 消费操作为输出消息的字符内容
*
* @param message
*/
/*@RabbitHandler
public void process(Map message) {
System.out.println("Receiver : " + message.toString());
}*/
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 因为传递消息的时候用的map传递,所以将Map从Message内取出需要做些处理
String msg = message.toString();
// 可以点进Message里面看源码,单引号直接的数据就是我们的map消息数据
String[] msgArray = msg.split("'");
Map<String, String> msgMap = mapStringToMap(msgArray[1].trim());
String messageId = msgMap.get("messageId");
String messageData = msgMap.get("messageData");
String createTime = msgMap.get("createTime");
System.out.println("messageId:" + messageId + " messageData:" + messageData + " createTime:" + createTime);
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
channel.basicReject(deliveryTag, false);
e.printStackTrace();
}
}
private Map<String, String> mapStringToMap(String str) {
str = str.substring(1, str.length() - 1);
String[] strs = str.split(",");
Map<String, String> map = new HashMap<>();
for (String string : strs) {
String key = string.split("=")[0].trim();
String value = string.split("=")[1];
map.put(key, value);
}
return map;
}
}
控制台出现messageId:f53a85b8-d6aa-4a2b-9ed2-f26ff7b95df3 messageData:test message! createTime:2019-10-23 16:40:31
就说明手动确认消息设置成功。
文章代码:RabbitMQ