博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
springboot rabbitmq对消费者和生产者丢消息设置
阅读量:4028 次
发布时间:2019-05-24

本文共 3147 字,大约阅读时间需要 10 分钟。

 

首先讲解防止生产者丢失信息处理。

1、设置发送消息确认: publisher-confirms: true

2、启动消息失败返回: publisher-returns: true

在yml中配置如下

spring:

rabbitmq:  host: localhost  port: 5672  virtual-host: mall  username: guest  password: guest  publisher-confirms: true #如果对异步消息需要回调必须设置为true  publisher-returns: true  listener:    direct:      acknowledge-mode: manual    simple:      retry:        enabled: true        max-attempts: 3        initial-interval: 2000      acknowledge-mode: manual      default-requeue-rejected: false  template:    mandatory: true # 触发returnedMessage回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调

在文件中增加

@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {    RabbitTemplate rabbitTemplate = new RabbitTemplate();    rabbitTemplate.setConnectionFactory(connectionFactory);    rabbitTemplate.setChannelTransacted(false);    rabbitTemplate.setMandatory(true);    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {        @Override        public void confirm(CorrelationData correlationData, boolean ack, String cause) {            if(ack){                log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);            }else{                System.out.print("----");            }        }    });    rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {        @Override        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {            message.getMessageProperties().setDeliveryMode(MessageProperties.DEFAULT_DELIVERY_MODE);            log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);        }    });    return rabbitTemplate;}

此两步可以解决生产者丢失消息问题。

 

对于消费者丢失消息

设置手动确认。在消费代码中try..catch.然后进行手动确认

 

@Component@RabbitListener(queues = "mall.order.cancel.ttl")public class CancelOrderReceiver {    private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class);    /**     * 默认情况下,如果没有配置手动ACK, 那么Spring Data AMQP 会在消息消费完毕后自动帮我们去ACK     * 存在问题:如果报错了,消息不会丢失,但是会无限循环消费,一直报错,如果开启了错误日志很容易就吧磁盘空间耗完     * 解决方案:手动ACK,或者try-catch 然后在 catch 里面将错误的消息转移到其它的系列中去     * spring.rabbitmq.listener.simple.acknowledge-mode = manual     */    @RabbitHandler    public void cfgUserReceiveDealy(Long orderId, Message message, Channel channel) throws IOException {        LOGGER.info("===============接收队列接收消息====================");        LOGGER.info("接收时间:{},接受内容:{}", LocalDateTime.now(), orderId.toString(),message.getMessageProperties());        //通知 MQ 消息已被接收,可以ACK(从队列中删除)了        boolean isAck = true;        try {            int f = 1/0;        } catch (Exception e) {                    isAck = false;            LOGGER.error("============消费失败,尝试消息补发再次消费!==============");            LOGGER.error(e.getMessage());            //这里最后一个参数是是否进入队列。我这里用法是死信队列所以必须设置为false。才能触发死信            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);        }        if(isAck){            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);        }    }}

 

下节讲解死信队列设置

转载地址:http://xlvbi.baihongyu.com/

你可能感兴趣的文章
No.147 - LeetCode1108
查看>>
No.148 - LeetCode771
查看>>
No.174 - LeetCode1305 - 合并两个搜索树
查看>>
No.175 - LeetCode1306
查看>>
No.176 - LeetCode1309
查看>>
FE:http状态码
查看>>
No.182 - LeetCode1325 - C指针的魅力
查看>>
mac:移动python包路径
查看>>
mysql:sql create database新建utf8mb4 数据库
查看>>
mysql:sql alter database修改数据库字符集
查看>>
mysql:sql alter table 修改列属性的字符集
查看>>
mysql:sql drop table (删除表)
查看>>
mysql:sql truncate (清除表数据)
查看>>
scrapy:xpath string(.)非常注意问题
查看>>
yuv to rgb 转换失败呀。天呀。谁来帮帮我呀。
查看>>
yuv420 format
查看>>
单纯的把Y通道提取出来能正确显示出灰度图来为什么我的Qt就显示不出来呢转换有问题呀?
查看>>
YUV420只绘制Y通道
查看>>
yuv420 还原为RGB图像
查看>>
LED恒流驱动芯片
查看>>