RabbitMQ学习(二)


在之前的RabbitMQ中,讲了四种交换机的规则和用法,现在我们来说说RabbitMQ中的消息回调,也就是消息确认。

消息回调

只需要实现回调配置以及回调函数即可

  • 首先需要在配置文件中添加消息回调的配置项
# 消息确认配置项

# 确认消息已发送到交换机(Exchange)
# spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-confirm-type=simple
# 确认消息已经发送到队列(Queue)
spring.rabbitmq.publisher-returns=true
  • 配置相关的消息回调函数

在MQ配置类中实现回调函数:

/**
* 消息确认回调函数
*
* @param connectionFactory
* @return
*/
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate();
    rabbitTemplate.setConnectionFactory(connectionFactory);
    // 设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
    rabbitTemplate.setMandatory(true);

    // 消息发送回调
    rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
        System.out.println("ConfirmCallback:     " + "相关数据:" + correlationData);
        System.out.println("ConfirmCallback:     " + "确认情况:" + ack);
        System.out.println("ConfirmCallback:     " + "原因:" + cause);
    });

    // 消息接收回调
    rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
        System.out.println("ReturnCallback:     " + "消息:" + message);
        System.out.println("ReturnCallback:     " + "回应码:" + replyCode);
        System.out.println("ReturnCallback:     " + "回应信息:" + replyText);
        System.out.println("ReturnCallback:     " + "交换机:" + exchange);
        System.out.println("ReturnCallback:     " + "路由键:" + routingKey);
    });

    return rabbitTemplate;
}

RabbitMQ中消息的推送包含四种情况:

情况一: 消息推送到server,但是在server里找不到交换机 , 这个交换机是没有创建没有配置的

结论: 这种情况触发的是 ConfirmCallback 回调函数。

情况二:消息推送到server,找到交换机了,但是没找到队列

结论: 这种情况触发的是 ConfirmCallback 和 RetrunCallback 两个回调函数。

情况三:消息推送到sever,交换机和队列啥都没找到 (和情况一相类似)

结论: 这种情况触发的是 ConfirmCallback 回调函数。

情况四: 消息推送成功

结论: 这种情况触发的是 ConfirmCallback 回调函数。

消息确认机制

和生产者的消息确认机制不同,消费者本来就是在监听消息,符合条件的消息就会消费下来;所以,消息接收确认机制主要有是三种:

第一种:自动确认,这也是默认的消息确认情况。 AcknowledgeMode.AUTO

RabbitMQ成功将消息发出(即将消息成功写入TCP Socket)中立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递。
所以这种情况如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。
一般这种情况我们都是使用try catch捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。

第二种:不确认,这个不做介绍。AcknowledgeMode.NONE

第三种:手动确认,使用最多的模式。AcknowledgeMode.MANUAL

basic.ack用于肯定确认
basic.nack用于否定确认(注意:这是AMQP 0-9-1的RabbitMQ扩展)
basic.reject用于否定确认,但与basic.nack相比有一个限制:一次只能拒绝单条消息
消费者端以上的3个方法都表示消息已经被正确投递,但是basic.ack表示消息已经被正确处理,但是basic.nack,basic.reject表示没有被正确处理,但是RabbitMQ中仍然需要删除这条消息。

  • 新建消息确认配置类
/**
 * 消费者消息确认机制处理<p>
 * 代码描述<p>
 * Copyright: Copyright (C) 2019 XXX, Inc. All rights reserved. <p>
 *
 * @author WQL
 * @since 2019年10月23日 0023 14:24
 */
@Configuration
public class MessageListenerConfig {

    @Autowired
    private CachingConnectionFactory connectionFactory;

    /**
     * 自己写的Direct消息接收处理类
     * 消费者类
     */
    @Autowired
    private Receiver receiver;

    /**
     * 自己写的MQ配置类(包含队列相关信息)
     */
    @Autowired
    private RabbitConfig rabbitConfig;

    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setConcurrentConsumers(1);
        container.setMaxConcurrentConsumers(1);
        // RabbitMQ默认是自动确认,这里改为手动确认消息
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setQueues(rabbitConfig.helloQueue());
        container.setMessageListener(receiver);

        return container;
    }
}
  • 消费者消息确认代码
@Component
@RabbitListener(queues = "hello")
public class Receiver implements ChannelAwareMessageListener {

    /**
     * 该消费者者实现了对hello队列的消费
     * 消费操作为输出消息的字符内容
     *
     * @param message
     */
    /*@RabbitHandler
    public void process(Map message) {
        System.out.println("Receiver : " + message.toString());
    }*/
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {
            // 因为传递消息的时候用的map传递,所以将Map从Message内取出需要做些处理
            String msg = message.toString();
            // 可以点进Message里面看源码,单引号直接的数据就是我们的map消息数据
            String[] msgArray = msg.split("'");
            Map<String, String> msgMap = mapStringToMap(msgArray[1].trim());
            String messageId = msgMap.get("messageId");
            String messageData = msgMap.get("messageData");
            String createTime = msgMap.get("createTime");
            System.out.println("messageId:" + messageId + "  messageData:" + messageData + "  createTime:" + createTime);
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            channel.basicReject(deliveryTag, false);
            e.printStackTrace();
        }
    }

    private Map<String, String> mapStringToMap(String str) {
        str = str.substring(1, str.length() - 1);
        String[] strs = str.split(",");
        Map<String, String> map = new HashMap<>();
        for (String string : strs) {
            String key = string.split("=")[0].trim();
            String value = string.split("=")[1];
            map.put(key, value);
        }
        return map;
    }
}

控制台出现messageId:f53a85b8-d6aa-4a2b-9ed2-f26ff7b95df3 messageData:test message! createTime:2019-10-23 16:40:31就说明手动确认消息设置成功。

文章代码:RabbitMQ


文章作者: Cody_
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Cody_ !
评论
 上一篇
Docker中安装Elasticsearch Docker中安装Elasticsearch
Docker给开发人员带来了极大的便利性;舍弃那些安装包和配置环境,拾起docker pull一把梭哈。 本文涉及到的所有版本号都是7.9.3 Docker镜像地址 安装Elasticsearch拉取镜像命令 docker pull ela
2020-12-12
下一篇 
JAVA8内置函数式接口 JAVA8内置函数式接口
Lambda表达式与函数式接口的关系: Lambda是函数式接口实现的快捷方式,它相当于函数式接口实现的实例,因为在方法中可以使用Object作为参数,所以把Lambda表达式作为方法参数也不是不可以。 函数式接口只有一个抽象方法,并且没有
2019-10-27
  目录