本文共 3147 字,大约阅读时间需要 10 分钟。
spring:
rabbitmq: host: localhost port: 5672 virtual-host: mall username: guest password: guest publisher-confirms: true #如果对异步消息需要回调必须设置为true publisher-returns: true listener: direct: acknowledge-mode: manual simple: retry: enabled: true max-attempts: 3 initial-interval: 2000 acknowledge-mode: manual default-requeue-rejected: false template: mandatory: true # 触发returnedMessage回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调
@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); rabbitTemplate.setChannelTransacted(false); rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if(ack){ log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause); }else{ System.out.print("----"); } } }); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { message.getMessageProperties().setDeliveryMode(MessageProperties.DEFAULT_DELIVERY_MODE); log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message); } }); return rabbitTemplate;}
此两步可以解决生产者丢失消息问题。
设置手动确认。在消费代码中try..catch.然后进行手动确认
@Component@RabbitListener(queues = "mall.order.cancel.ttl")public class CancelOrderReceiver { private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class); /** * 默认情况下,如果没有配置手动ACK, 那么Spring Data AMQP 会在消息消费完毕后自动帮我们去ACK * 存在问题:如果报错了,消息不会丢失,但是会无限循环消费,一直报错,如果开启了错误日志很容易就吧磁盘空间耗完 * 解决方案:手动ACK,或者try-catch 然后在 catch 里面将错误的消息转移到其它的系列中去 * spring.rabbitmq.listener.simple.acknowledge-mode = manual */ @RabbitHandler public void cfgUserReceiveDealy(Long orderId, Message message, Channel channel) throws IOException { LOGGER.info("===============接收队列接收消息===================="); LOGGER.info("接收时间:{},接受内容:{}", LocalDateTime.now(), orderId.toString(),message.getMessageProperties()); //通知 MQ 消息已被接收,可以ACK(从队列中删除)了 boolean isAck = true; try { int f = 1/0; } catch (Exception e) { isAck = false; LOGGER.error("============消费失败,尝试消息补发再次消费!=============="); LOGGER.error(e.getMessage()); //这里最后一个参数是是否进入队列。我这里用法是死信队列所以必须设置为false。才能触发死信 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); } if(isAck){ channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } }}
转载地址:http://xlvbi.baihongyu.com/