哈喽大家好,我是阿Q! 上文,我们已经完成了SpringBoot快速集成RabbitMQ的小Demo,本文我们来聊一下RabbitMQ为了防止消息丢失,增加的消息确认机制:生产者消息确认机制和消费者消息确认机制。确认机制 一、生产者消息确认机制在yml中增加配置信息spring:rabbitmq:确认消息已发送到交换机(Exchange)publisherconfirmtype:correlated确认消息已发送到队列(Queue)publisherreturns:true spring。rabbitmq。publisherconfirm新版本已被弃用,现在使用spring。rabbitmq。publisherconfirmtypecorrelated实现相同效果增加回调BeanpublicRabbitTemplatecreateRabbitTemplate(ConnectionFactoryconnectionFactory){RabbitTemplaterabbitTemplatenewRabbitTemplate();rabbitTemplate。setConnectionFactory(connectionFactory);设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数rabbitTemplate。setMandatory(true);rabbitTemplate。setConfirmCallback(newRabbitTemplate。ConfirmCallback(){Overridepublicvoidconfirm(CorrelationDatacorrelationData,booleanack,Stringcause){System。out。println(ConfirmCallback:相关数据:correlationData);System。out。println(ConfirmCallback:确认情况:ack);System。out。println(ConfirmCallback:原因:cause);}});rabbitTemplate。setReturnsCallback(newRabbitTemplate。ReturnsCallback(){OverridepublicvoidreturnedMessage(ReturnedMessagereturned){System。out。println(ReturnCallback:消息:returned。getMessage());System。out。println(ReturnCallback:回应码:returned。getReplyCode());System。out。println(ReturnCallback:回应信息:returned。getReplyText());System。out。println(ReturnCallback:交换机:returned。getExchange());System。out。println(ReturnCallback:路由键:returned。getRoutingKey());}});returnrabbitT}confirm机制是只保证消息到达exchange,并不保证消息可以路由到正确的queue当前的exchange不存在或者指定的路由key路由不到才会触发return机制 大家可以自行演示以下情况的执行结果:不存在交换机和队列存在交换机,不存在队列消息推送成功 二、消费者消息的确认机制 默认情况下如果一个消息被消费者正确接收则会从队列中移除。如果一个队列没被任何消费者订阅,那么这个队列中的消息会被缓存,当有消费者订阅时则会立即发送,进而从队列中移除。 消费者消息的确认机制可以分为以下3种:自动确认 AcknowledgeMode。NONE默认为自动确认,不管消费者是否成功处理了消息,消息都会从队列中被移除。根据情况确认 AcknowledgeMode。AUTO根据方法的执行情况来决定是否确认还是拒绝(是否重新入队列)如果消息成功被消费(成功的意思是在消费的过程中没有抛出异常),则自动确认当抛出AmqpRejectAndDontRequeueException异常的时候,则消息会被拒绝,且消息不会重回队列当抛出ImmediateAcknowledgeAmqpException异常,则消费者会被确认其他的异常,则消息会被拒绝,并且该消息会重回队列,如果此时只有一个消费者监听该队列,则有发生死循环的风险,多消费端也会造成资源的极大浪费,这个在开发过程中一定要避免的。可以通过setDefaultRequeueRejected(默认是true)去设置 可能造成消息丢失,一般是需要我们在trycatch捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。手动确认 AcknowledgeMode。MANUAL对于手动确认,也是我们工作中最常用到的,它的用法如下:肯定确认deliveryTag:消息队列数据的唯一idmultiple:是否批量true:一次性确认所有小于等于deliveryTag的消息false:对当前消息进行确认;channel。basicAck(longdeliveryTag,booleanmultiple);否定确认multiple:是否批量true:一次性拒绝所有小于deliveryTag的消息false:对当前消息进行确认;requeue:被拒绝的是否重新入列,true:就是将数据重新丢回队列里,那么下次还会消费这消息;false:就是拒绝处理该消息,服务器把该消息丢掉即可。channel。basicNack(longdeliveryTag,booleanmultiple,booleanrequeue);用于否定确认,但与basicNack相比有一个限制,一次只能拒绝单条消息channel。basicReject(longdeliveryTag,booleanrequeue);手动确认 在yml配置中开启手动确认模式spring:rabbitmq:listener:simple:acknowledgemode:manual 或者在代码中开启ConfigurationpublicclassMessageListenerConfig{AutowiredprivateCachingConnectionFactoryconnectionFAutowiredprivateMQRecievermqR消息接收处理类BeanpublicSimpleMessageListenerContainersimpleMessageListenerContainer(){SimpleMessageListenerContainercontainernewSimpleMessageListenerContainer(connectionFactory);并发使用者的数量container。setConcurrentConsumers(1);消费者人数上限container。setMaxConcurrentConsumers(1);container。setAcknowledgeMode(AcknowledgeMode。MANUAL);RabbitMQ默认是自动确认,这里改为手动确认消息设置一个队列,此处支持设置多个container。setQueueNames(directQueue);container。setMessageListener(mqReciever);}} 消息消费类ComponentRabbitListener(queuesdirectQueue)监听队列名称publicclassMQRecieverimplementsChannelAwareMessageListener{OverridepublicvoidonMessage(Messagemessage,Channelchannel)throwsException{longdeliveryTagmessage。getMessageProperties()。getDeliveryTag();try{Stringmsgmessage。toString();String〔〕msgArraymsg。split();可以点进Message里面看源码,单引号直接的数据就是我们的map消息数据System。out。println(消费的消息内容:msgArray〔1〕);System。out。println(消费的主题消息来自:message。getMessageProperties()。getConsumerQueue());业务处理。。。。。。channel。basicAck(deliveryTag,true);}catch(Exceptione){拒绝重新入队列channel。basicReject(deliveryTag,false);e。printStackTrace();}}} 无ack:效率高,存在丢失大量消息的风险;有ack:效率低,不会丢消息。 作者:阿Q说代码 链接:https:juejin。cnpost7063259228898590750