1自动配置1。1配置连接工厂publicclassRabbitAutoConfiguration{Configuration(proxyBeanMethodsfalse)ConditionalOnMissingBean(ConnectionFactory。class)protectedstaticclassRabbitConnectionFactoryCreator{BeanpublicCachingConnectionFactoryrabbitConnectionFactory(RabbitPropertiesproperties,ObjectProviderConnectionNameStrategyconnectionNameStrategy)throwsException{PropertyMappermapPropertyMapper。get();CachingConnectionFactoryfactorynewCachingConnectionFactory(getRabbitConnectionFactoryBean(properties)。getObject());设置属性值}privateRabbitConnectionFactoryBeangetRabbitConnectionFactoryBean(RabbitPropertiesproperties)throwsException{PropertyMappermapPropertyMapper。get();RabbitConnectionFactoryBeanfactorynewRabbitConnectionFactoryBean();。。。设置属性值factory。afterPropertiesSet();}}}1。2配置RabbitTemplatepublicclassRabbitAutoConfiguration{Configuration(proxyBeanMethodsfalse)Import(RabbitConnectionFactoryCreator。class)protectedstaticclassRabbitTemplateConfiguration{可自定义该类来自定义自己的实现BeanConditionalOnMissingBeanpublicRabbitTemplateConfigurerrabbitTemplateConfigurer(RabbitPropertiesproperties,ObjectProviderMessageConvertermessageConverter,ObjectProviderRabbitRetryTemplateCustomizerretryTemplateCustomizers){RabbitTemplateConfigurerconfigurernewRabbitTemplateConfigurer();。。。配置消息转换}BeanConditionalOnSingleCandidate(ConnectionFactory。class)ConditionalOnMissingBean(RabbitOperations。class)publicRabbitTemplaterabbitTemplate(RabbitTemplateConfigurerconfigurer,ConnectionFactoryconnectionFactory){RabbitTemplatetemplatenewRabbitTemplate();configurer。configure(template,connectionFactory);}}} 以上配置比较简单,都是一些基本的配置,配置Rabbit的连接工厂,配置Template,客户端操作的模版RabbitTemplate对象。2监听配置2。1监听容器配置消息监听的核心就在导入的类中Import(RabbitAnnotationDrivenConfiguration。class)publicclassRabbitAutoConfiguration{} 注解核心配置Configuration(proxyBeanMethodsfalse)ConditionalOnClass(EnableRabbit。class)classRabbitAnnotationDrivenConfiguration{privatefinalObjectProviderMessageConvertermessageCprivatefinalObjectProviderMessageRecoverermessageRprivatefinalObjectProviderRabbitRetryTemplateCustomizerretryTemplateCprivatefinalRabbitPRabbitAnnotationDrivenConfiguration(ObjectProviderMessageConvertermessageConverter,ObjectProviderMessageRecoverermessageRecoverer,ObjectProviderRabbitRetryTemplateCustomizerretryTemplateCustomizers,RabbitPropertiesproperties){this。messageConvertermessageCthis。messageRecoverermessageRthis。retryTemplateCustomizersretryTemplateCthis。}BeanConditionalOnMissingBeanSimpleRabbitListenerContainerFactoryConfigurersimpleRabbitListenerContainerFactoryConfigurer(){SimpleRabbitListenerContainerFactoryConfigurerconfigurernewSimpleRabbitListenerContainerFactoryConfigurer();configurer。setMessageConverter(this。messageConverter。getIfUnique());configurer。setMessageRecoverer(this。messageRecoverer。getIfUnique());configurer。setRetryTemplateCustomizers(this。retryTemplateCustomizers。orderedStream()。collect(Collectors。toList()));configurer。setRabbitProperties(this。properties);}Bean(namerabbitListenerContainerFactory)ConditionalOnMissingBean(namerabbitListenerContainerFactory)ConditionalOnProperty(prefixspring。rabbitmq。listener,nametype,havingValuesimple,matchIfMissingtrue)SimpleRabbitListenerContainerFactorysimpleRabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurerconfigurer,ConnectionFactoryconnectionFactory){SimpleRabbitListenerContainerFactoryfactorynewSimpleRabbitListenerContainerFactory();configurer。configure(factory,connectionFactory);}这里还有一个DirectRabbitListenerContainerFactory配置消息监听容器有两种一种是SIMPLE,还有一种是DIRECTConfiguration(proxyBeanMethodsfalse)该注解重点,是由它的Import来解析RabbitListener注解EnableRabbitConditionalOnMissingBean(nameRabbitListenerConfigUtils。RABBITLISTENERANNOTATIONPROCESSORBEANNAME)staticclassEnableRabbitConfiguration{}}publicfinalclassSimpleRabbitListenerContainerFactoryConfigurerextendsAbstractRabbitListenerContainerFactoryConfigurerSimpleRabbitListenerContainerFactory{Overridepublicvoidconfigure(SimpleRabbitListenerContainerFactoryfactory,ConnectionFactoryconnectionFactory){PropertyMappermapPropertyMapper。get();以下会为SimpleRabbitListenerContainerFactory工厂配置基础信息(通过配置文件设置的信息)RabbitProperties。SimpleContainerconfiggetRabbitProperties()。getListener()。getSimple();configure(factory,connectionFactory,config);设置并发信息map。from(config::getConcurrency)。whenNonNull()。to(factory::setConcurrentConsumers);map。from(config::getMaxConcurrency)。whenNonNull()。to(factory::setMaxConcurrentConsumers);map。from(config::getBatchSize)。whenNonNull()。to(factory::setBatchSize);}}2。2注册核心处理器 主要注册一个BeanPostProcessor和RabbitListenerEndpointRegistry创建消息监听容器管理生命周期。Import(RabbitListenerConfigurationSelector。class)publicinterfaceEnableRabbit{}OrderpublicclassRabbitListenerConfigurationSelectorimplementsDeferredImportSelector{OverridepublicString〔〕selectImports(AnnotationMetadataimportingClassMetadata){注册如下Bean对象returnnewString〔〕{RabbitBootstrapConfiguration。class。getName()};}} RabbitBootstrapConfiguration。javapublicclassRabbitBootstrapConfigurationimplementsImportBeanDefinitionRegistrar{OverridepublicvoidregisterBeanDefinitions(AnnotationMetadataimportingClassMetadata,BeanDefinitionRegistryregistry){if(!registry。containsBeanDefinition(RabbitListenerConfigUtils。RABBITLISTENERANNOTATIONPROCESSORBEANNAME)){注册一个BeanPostProcessor对象,用来处理RabbitListener注解registry。registerBeanDefinition(RabbitListenerConfigUtils。RABBITLISTENERANNOTATIONPROCESSORBEANNAME,newRootBeanDefinition(RabbitListenerAnnotationBeanPostProcessor。class));}if(!registry。containsBeanDefinition(RabbitListenerConfigUtils。RABBITLISTENERENDPOINTREGISTRYBEANNAME)){注册消息监听容器MessageListenerContainer对象,同时还管理它们的生命周期MessageListenerContainer实现了SmartLifecycle接口(生命周期接口,在Spring中如果只是实现Lifecycle接口是没有作用的,只能实现SmartLifecycle接口)registry。registerBeanDefinition(RabbitListenerConfigUtils。RABBITLISTENERENDPOINTREGISTRYBEANNAME,newRootBeanDefinition(RabbitListenerEndpointRegistry。class));}}}2。3Bean后处理器publicclassRabbitListenerAnnotationBeanPostProcessorimplementsBeanPostProcessor,Ordered,BeanFactoryAware,BeanClassLoaderAware,EnvironmentAware,SmartInitializingSingleton{publicObjectpostProcessAfterInitialization(finalObjectbean,finalStringbeanName)throwsBeansException{获取目标类(如果是代理类)C?targetClassAopUtils。getTargetClass(bean);在targetClass类上及内部方法上查找RabbitListener注解finalTypeMetadatametadatathis。typeCache。computeIfAbsent(targetClass,this::buildMetadata);在buildMetadata方法中解析完后,开始遍历处理所有的监听方法for(ListenerMethodlm:metadata。listenerMethods){由于RabbitListener可以重复定义(一个方法上可有有多个该注解),所有这里是个数组for(RabbitListenerrabbitListener:lm。annotations){处理消息监听processAmqpListener(rabbitListener,lm。method,bean,beanName);}}if(metadata。handlerMethods。length0){processMultiMethodListeners(metadata。classAnnotations,metadata。handlerMethods,bean,beanName);}}privateTypeMetadatabuildMetadata(C?targetClass){CollectionRabbitListenerclassLevelListenersfindListenerAnnotations(targetClass);finalbooleanhasClassLevelListenersclassLevelListeners。size()0;finalListListenerMethodmethodsnewArrayList();finalListMethodmultiMethodsnewArrayList();遍历targetClass上的所有方法ReflectionUtils。doWithMethods(targetClass,method{当前Method上查找RabbitListener直接CollectionRabbitListenerlistenerAnnotationsfindListenerAnnotations(method);if(listenerAnnotations。size()0){如果方法上有该注解,则将当前的Method对象及所有的RabbitListener注释类封装到ListenerMethod中methods。add(newListenerMethod(method,listenerAnnotations。toArray(newRabbitListener〔listenerAnnotations。size()〕)));}如果targetClass类上有RabbitListener注解,那么会查找方法上是否有RabbitHandler注解注意:如果类上有RabbitListener注解,那么方法上必须使用RabbitHandler注解在这里我们不分析该种情况,我们只分析在方法上有RabbitListener注解的if(hasClassLevelListeners){RabbitHandlerrabbitHandlerAnnotationUtils。findAnnotation(method,RabbitHandler。class);if(rabbitHandler!null){multiMethods。add(method);}}},ReflectionUtils。USERDECLAREDMETHODS);if(methods。isEmpty()multiMethods。isEmpty()){returnTypeMetadata。EMPTY;}最后将解析出来的信息包装到TypeMetadata对象中returnnewTypeMetadata(methods。toArray(newListenerMethod〔methods。size()〕),multiMethods。toArray(newMethod〔multiMethods。size()〕),classLevelListeners。toArray(newRabbitListener〔classLevelListeners。size()〕));}} 在上一步将所有的方法(方法上有RabbitListener注解的)解析处理后,接下来开始处理消息监听2。4构造消息监听端点publicclassRabbitListenerAnnotationBeanPostProcessor{privatefinalRabbitListenerEndpointRegistrarregistrarnewRabbitListenerEndpointRegistrar();publicObjectpostProcessAfterInitialization(。。。)throwsBeansException{processAmqpListener(rabbitListener,lm。method,bean,beanName);}protectedvoidprocessAmqpListener(RabbitListenerrabbitListener,Methodmethod,Objectbean,StringbeanName){MethodmethodToUsecheckProxy(method,bean);为每个监听的方法Method创建端点(执行消息处理程序)MethodRabbitListenerEndpointendpointnewMethodRabbitListenerEndpoint();endpoint。setMethod(methodToUse);处理监听程序processListener(endpoint,rabbitListener,bean,methodToUse,beanName);}protectedvoidprocessListener(MethodRabbitListenerEndpointendpoint,RabbitListenerrabbitListener,Objectbean,Objecttarget,StringbeanName){endpoint。setBean(bean);endpoint。setMessageHandlerMethodFactory(this。messageHandlerMethodFactory);如果RabbitListener没有配置id属性,那么将生成如下信息idorg。springframework。amqp。rabbit。RabbitListenerEndpointContainerthis。counter。getAndIncrement();endpoint。setId(getEndpointId(rabbitListener));设置监听的队列信息(RabbitListener中配置的queues)endpoint。setQueueNames(resolveQueues(rabbitListener));并发数设置,会覆盖配置文件中配置的endpoint。setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener。concurrency(),concurrency));endpoint。setBeanFactory(this。beanFactory);。。。解析如果配置了线程池,则在指定的线程池中运行resolveExecutor(endpoint,rabbitListener,target,beanName);。。。设置应答模式resolveAckMode(endpoint,rabbitListener);。。。获取消息监听容器工厂,这就是获取在2。1中配置的SIMPLE类型的监听容器工厂默认这里没有配置将会返回nullRabbitListenerContainerF?factoryresolveContainerFactory(rabbitListener,target,beanName);this。registrar。registerEndpoint(endpoint,factory);}}2。5构建消息监听 接着上面构建完MethodRabbitListenerEndpoint对象后,将所有的监听方法保存publicclassRabbitListenerEndpointRegistrarimplementsBeanFactoryAware,InitializingBean{这里的factory是nullpublicvoidregisterEndpoint(RabbitListenerEndpointendpoint,NullableRabbitListenerContainerF?factory){factory,我们在实际创建容器之前推迟解决(这是源码中的注释说明)AmqpListenerEndpointDescriptordescriptornewAmqpListenerEndpointDescriptor(endpoint,factory);synchronized(this。endpointDescriptors){默认是falseif(this。startImmediately){Registerandstartimmediatelythis。endpointRegistry。registerListenerContainer(descriptor。endpoint,resolveContainerFactory(descriptor),true);}else{添加到集合中this。endpointDescriptors。add(descriptor);}}}} 到此消息监听RabbitListener注解的方法就处理完成了,所有的监听方法都保存到了RabbitListenerAnnotationBeanPostProcessor。registrar。endpointDescriptors集合中。2。6注册消息监听 RabbitListenerAnnotationBeanPostProcessor处理器程序实现了SmartInitializingSingleton接口,所以在所有的Bean创建完成以后会执行Bean实现了SmartInitializingSingletonafterSingletonsInstantiated的方法。publicclassRabbitListenerAnnotationBeanPostProcessorimplements。。。SmartInitializingSingleton{publicstaticfinalStringDEFAULTRABBITLISTENERCONTAINERFACTORYBEANNAMErabbitListenerContainerFprivateStringdefaultContainerFactoryBeanNameDEFAULTRABBITLISTENERCONTAINERFACTORYBEANNAME;OverridepublicvoidafterSingletonsInstantiated(){设置BeanFactory工厂this。registrar。setBeanFactory(this。beanFactory);if(this。beanFactoryinstanceofListableBeanFactory){。。。这里代码执行的集合会为空}这里默认会为null,因为registrar是直接new并没有设置if(this。registrar。getEndpointRegistry()null){if(this。endpointRegistrynull){从容器中查找,在2。1中注册的RabbitListenerEndpointRegistry对象this。endpointRegistrythis。beanFactory。getBean(RabbitListenerConfigUtils。RABBITLISTENERENDPOINTREGISTRYBEANNAME,RabbitListenerEndpointRegistry。class);}设置this。registrar。setEndpointRegistry(this。endpointRegistry);}设置默认的容器工厂BeanNameif(this。defaultContainerFactoryBeanName!null){this。registrar。setContainerFactoryBeanName(this。defaultContainerFactoryBeanName);}。。。这里注册所有的监听程序this。registrar。afterPropertiesSet();清空缓存this。typeCache。clear();}} 注册监听程序publicclassRabbitListenerEndpointRegistrarimplementsBeanFactoryAware,InitializingBean{publicvoidafterPropertiesSet(){registerAllEndpoints();}protectedvoidregisterAllEndpoints(){synchronized(this。endpointDescriptors){变量在2。5中构建的所有监听方法for(AmqpListenerEndpointDescriptordescriptor:this。endpointDescriptors){注册消息监听容器;注意这里resolveContainerFactory方法会获取容器中在2。1中创建的容器工厂this。endpointRegistry。registerListenerContainer(descriptor。endpoint,resolveContainerFactory(descriptor));}触发立即启动this。startI}}privateRabbitListenerContainerF?resolveContainerFactory(AmqpListenerEndpointDescriptordescriptor){if(descriptor。containerFactory!null){returndescriptor。containerF}elseif(this。containerFactory!null){returnthis。containerF}elseif(this。containerFactoryBeanName!null){直接进入到此处获取容器在2。1中创建的SimpleRabbitListenerContainerFactorythis。containerFactorythis。beanFactory。getBean(this。containerFactoryBeanName,RabbitListenerContainerFactory。class);returnthis。containerF}}} 注册监听容器publicclassRabbitListenerEndpointRegistryimplementsDisposableBean,SmartLifecycle,ApplicationContextAware,ApplicationListenerContextRefreshedEvent{publicvoidregisterListenerContainer(RabbitListenerEndpointendpoint,RabbitListenerContainerF?factory){registerListenerContainer(endpoint,factory,false);}publicvoidregisterListenerContainer(RabbitListenerEndpointendpoint,RabbitListenerContainerF?factory,booleanstartImmediately){获取关于id值已经在2。4中有说明Stringidendpoint。getId();synchronized(this。listenerContainers){根据在2。4中构造的消息监听端点创建消息监听容器这里的factorySimpleRabbitListenerContainerF在2。1中创建下面的步骤创建消息监听容器MessageListenerContainercontainercreateListenerContainer(endpoint,factory);this。listenerContainers。put(id,container);if(StringUtils。hasText(endpoint。getGroup())this。applicationContext!null){ListMessageListenerContainercontainerGif(this。applicationContext。containsBean(endpoint。getGroup())){containerGroupthis。applicationContext。getBean(endpoint。getGroup(),List。class);}else{containerGroupnewArrayListMessageListenerContainer();this。applicationContext。getBeanFactory()。registerSingleton(endpoint。getGroup(),containerGroup);}containerGroup。add(container);}if(this。contextRefreshed){container。lazyLoad();}if(startImmediately){startIfNecessary(container);}}}protectedMessageListenerContainercreateListenerContainer(RabbitListenerEndpointendpoint,RabbitListenerContainerF?factory){通过SimpleRabbitListenerContainerFactory创建监听容器调用父类AbstractRabbitListenerContainerFactorycreateListenerContainer方法SimpleRabbitListenerContainerFactory在2。1中创建,并且设置了配置文件中配置的相关参数信息MessageListenerContainerlistenerContainerfactory。createListenerContainer(endpoint);if(listenerContainerinstanceofInitializingBean){try{执行初始化方法((InitializingBean)listenerContainer)。afterPropertiesSet();}。。。}intcontainerPhaselistenerContainer。getPhase();if(containerPhaseInteger。MAXVALUE){acustomphasevalueif(this。phaseInteger。MAXVALUEthis。phase!containerPhase){throwexception}this。phaselistenerContainer。getPhase();}returnlistenerC}} 监听容器工厂父类创建监听容器publicabstractclassAbstractRabbitListenerContainerFactory{publicCcreateListenerContainer(RabbitListenerEndpointendpoint){调用子类SimpleRabbitListenerContainerFactory方法如下创建的SimpleMessageListenerContainer对象CinstancecreateContainerInstance();JavaUtilsjavaUtilsJavaUtils。INSTANCE。acceptIfNotNull(this。connectionFactory,instance::setConnectionFactory)。acceptIfNotNull(this。errorHandler,instance::setErrorHandler);if(this。messageConverter!nullendpoint!null){endpoint。setMessageConverter(this。messageConverter);}javaUtils。acceptIfNotNull(this。acknowledgeMode,instance::setAcknowledgeMode)。。。if(this。batchListenerthis。deBatchingEnablednull){turnoffcontainerdebatchingbydefaultforbatchlistenersinstance。setDeBatchingEnabled(false);}覆盖默认工厂的参数信息if(endpoint!null){endpointsettingsoverridingdefaultfactorysettingsjavaUtils。acceptIfNotNull(endpoint。getAutoStartup(),instance::setAutoStartup)。acceptIfNotNull(endpoint。getTaskExecutor(),instance::setTaskExecutor)。acceptIfNotNull(endpoint。getAckMode(),instance::setAcknowledgeMode);javaUtils。acceptIfNotNull(this。batchingStrategy,endpoint::setBatchingStrategy);instance。setListenerId(endpoint。getId());endpoint。setBatchListener(this。batchListener);endpoint。setupListenerContainer(instance);}。。。initializeContainer(instance,endpoint);if(this。containerCustomizer!null){this。containerCustomizer。configure(instance);}}}publicclassSimpleRabbitListenerContainerFactory{protectedSimpleMessageListenerContainercreateContainerInstance(){returnnewSimpleMessageListenerContainer();}protectedvoidinitializeContainer(SimpleMessageListenerContainerinstance,RabbitListenerEndpointendpoint){super。initializeContainer(instance,endpoint);JavaUtilsjavaUtilsJavaUtils。INSTANCE。acceptIfNotNull(this。batchSize,instance::setBatchSize);Sif(endpoint!null){concurrencyendpoint。getConcurrency();javaUtils。acceptIfNotNull(concurrency,instance::setConcurrency);}javaUtils。acceptIfCondition(concurrencynullthis。concurrentConsumers!null,this。concurrentConsumers,instance::setConcurrentConsumers)。acceptIfCondition((concurrencynull!(concurrency。contains()))this。maxConcurrentConsumers!null,this。maxConcurrentConsumers,instance::setMaxConcurrentConsumers)。acceptIfNotNull(this。startConsumerMinInterval,instance::setStartConsumerMinInterval)。acceptIfNotNull(this。stopConsumerMinInterval,instance::setStopConsumerMinInterval)。acceptIfNotNull(this。consecutiveActiveTrigger,instance::setConsecutiveActiveTrigger)。acceptIfNotNull(this。consecutiveIdleTrigger,instance::setConsecutiveIdleTrigger)。acceptIfNotNull(this。receiveTimeout,instance::setReceiveTimeout);if(Boolean。TRUE。equals(this。consumerBatchEnabled)){instance。setConsumerBatchEnabled(true);instance。setDeBatchingEnabled(true);}}} 到此消息监听容器MessageListenerContainer(SimpleMessageListenerContainer)对象创建完成, 到这里主要的消息监听容器都创建完成后接下来就是启动消息监听容器了。3启动消息监听 在2。2中注册了RabbitListenerEndpointRegistry对象,该类实现了SmartLifecycle接口,也实现了ApplicationListener接口,并且处理的是ContextRefreshedEvent事件。 上面这两个动作都会在容器上下文初始化完成以后触发,在AbstractApplicationContextrefreshfinishRefresh方法中触发publicabstractclassAbstractApplicationContext{protectedvoidfinishRefresh(){Initializelifecycleprocessorforthiscontext。初始化生命周期处理器类,默认为:DefaultLifecycleProcessorinitLifecycleProcessor();Propagaterefreshtolifecycleprocessorfirst。处理生命周期方法getLifecycleProcessor()。onRefresh();Publishthefinalevent。发布事件publishEvent(newContextRefreshedEvent(this));}} 声明周期执行publicclassDefaultLifecycleProcessorimplementsLifecycleProcessor,BeanFactoryAware{publicvoidonRefresh(){startBeans(true);this。}执行所有的SmartLifecyclestart方法privatevoidstartBeans(booleanautoStartupOnly){MapString,LifecyclelifecycleBeansgetLifecycleBeans();MapInteger,LifecycleGroupphasesnewTreeMap();lifecycleBeans。forEach((beanName,bean){if(!autoStartupOnly(beaninstanceofSmartLifecycle((SmartLifecycle)bean)。isAutoStartup())){intphasegetPhase(bean);phases。computeIfAbsent(phase,pnewLifecycleGroup(phase,this。timeoutPerShutdownPhase,lifecycleBeans,autoStartupOnly))。add(beanName,bean);}});if(!phases。isEmpty()){phases。values()。forEach(LifecycleGroup::start);}}} 开始消息监听publicclassRabbitListenerEndpointRegistryimplementsDisposableBean,SmartLifecycle,ApplicationContextAware,ApplicationListenerContextRefreshedEvent{publicvoidstart(){for(MessageListenerContainerlistenerContainer:getListenerContainers()){startIfNecessary(listenerContainer);}}listenerContainerSimpleMessageListenerContainerprivatevoidstartIfNecessary(MessageListenerContainerlistenerContainer){if(this。contextRefreshedlistenerContainer。isAutoStartup()){启动调用父类的start方法listenerContainer。start();}}}publicabstractclassAbstractMessageListenerContainer{publicvoidstart(){。。。该方法在子类中实现doStart();。。。}}publicclassSimpleMessageListenerContainerextendsAbstractMessageListenerContainer{父类中的属性privateExecutortaskExecutornewSimpleAsyncTaskExecutor();protectedvoiddoStart(){intnewConsumersinitializeConsumers();synchronized(this。consumersMonitor){intnewConsumersinitializeConsumers();。。。SetprocessorsnewHashSet();变量消费者for(BlockingQueueConsumerconsumer:this。consumers){AsyncMessageProcessingConsumer实现了Runable接口核心消息的处理就在该Consumer中进行AsyncMessageProcessingConsumerprocessornewAsyncMessageProcessingConsumer(consumer);processors。add(processor);获取线程池提交任务Runnable对象线程池对象是系统默认的SimpleAsyncTaskExecutorgetTaskExecutor()。execute(processor);if(getApplicationEventPublisher()!null){getApplicationEventPublisher()。publishEvent(newAsyncConsumerStartedEvent(this,consumer));}}waitForConsumersToStart(processors);}}protectedintinitializeConsumers(){intcount0;synchronized(this。consumersMonitor){if(this。consumersnull){this。cancellationLock。reset();根据并发数创建阻塞队列(有几个并发消费者,就创建几个阻塞队列消费者(内部使用的LinkedBlockingQueue队列))this。consumersnewHashSetBlockingQueueConsumer(this。concurrentConsumers);for(inti0;ithis。concurrentCi){创建阻塞队列消费者BlockingQueueConsumerconsumercreateBlockingQueueConsumer();this。consumers。add(consumer);}}}}protectedBlockingQueueConsumercreateBlockingQueueConsumer(){BlockingQueueCString〔〕queuesgetQueueNames();实际就是配置的prefetch属性值,该值的取值0到65535,如果是0表示没有限制该prefetch参数与Qos绑定(服务质量)channel。basicQos(this。prefetchCount,。。。)intactualPrefetchCountgetPrefetchCount()this。batchSize?getPrefetchCount():this。batchSconsumernewBlockingQueueConsumer(getConnectionFactory(),getMessagePropertiesConverter(),this。cancellationLock,getAcknowledgeMode(),isChannelTransacted(),actualPrefetchCount,isDefaultRequeueRejected(),getConsumerArguments(),isNoLocal(),isExclusive(),queues);consumer。setGlobalQos(isGlobalQos());consumer。setMissingQueuePublisher(this::publishMissingQueueEvent);if(this。declarationRetries!null){consumer。setDeclarationRetries(this。declarationRetries);}if(getFailedDeclarationRetryInterval()0){consumer。setFailedDeclarationRetryInterval(getFailedDeclarationRetryInterval());}if(this。retryDeclarationInterval!null){consumer。setRetryDeclarationInterval(this。retryDeclarationInterval);}ConsumerTagStrategyconsumerTagStrategygetConsumerTagStrategy();if(consumerTagStrategy!null){consumer。setTagStrategy(consumerTagStrategy);}consumer。setBackOffExecution(getRecoveryBackOff()。start());consumer。setShutdownTimeout(getShutdownTimeout());consumer。setApplicationEventPublisher(getApplicationEventPublisher());}} 异步消息处理消费者AsyncMessageProcessingConsumerprivatefinalclassAsyncMessageProcessingConsumerimplementsRunnable{publicvoidrun(){。。。核心方法try{该方法中主要就是从RabbitBroker上获取消息存入到消息队列中1。首先是设置设置Qos该值对应的是配置中的prefetch,channel。basicQos(this。prefetchCount)2。消费获取消息内部定义了InternalConsumer消费者重写了handleDelivery方法在该方法中会将获取到的消息包装到Delivery对象中然后调用BlockingQueueoffer方法存入队列中,offer如果存入成功返回true,失败返回false。initialize();while(isActive(this。consumer)this。consumer。hasDelivery()!this。consumer。cancelled()){从队列中不停的拿消息mainLoop();}}。。。}} 事件处理publicclassRabbitListenerEndpointRegistryimplementsDisposableBean,SmartLifecycle,ApplicationContextAware,ApplicationListenerContextRefreshedEvent{privatebooleancontextRpublicvoidonApplicationEvent(ContextRefreshedEventevent){if(event。getApplicationContext()。equals(this。applicationContext)){当收到事件后设置为true表示上下文已经初始化完成了this。contextR}}}4总结流程开启监听功能 RabbitAutoConfiguration》RabbitAnnotationDrivenConfiguration》EnableRabbitConfiguration》EnableRabbit注册BeanPostProcessor 注册RabbitListenerAnnotationBeanPostProcessor处理器处理RabbitListener和RabbitHandler注解构造Endpoint RabbitListenerAnnotationBeanPostProcessor类 将上一步解析出来的所有方法及对应的RabbitListener注解中配置的信息进行包装到MethodRabbitListenerEndpoint中 说明:RabbitListener注解中的errorHandler属性可以是SpEL表达式也可以是一个Bean的名称 该步骤中主要就是设置相关的一些属性信息到Endpoint中,比如:ackMode,queueName,concurrency等信息。 构造完Endpoint对象后将其保存到RabbitListenerEndpointRegistrar中。注册Endpoint RabbitListenerAnnotationBeanPostProcessor类实现了SmartInitializingSingleton接口,当所有的Bean初始化完成以后会执行实现了SmartInitializingSingleton接口Bean的回调方法afterSingletonsInstantiated。 在afterSingletonsInstantiated方法中调用RabbitListenerAnnotationBeanPostProcessor。registrar(RabbitListenerEndpointRegistrar)afterPropertiesSet 方法。 在afterPropertiesSet方法中就是注册Endpoint了,在该方法中将所有的Endpoint再封装成MessageListenerContainer(SimpleMessageListenerContainer) 对象,最后将MessageListenerContainer对象保存到RabbitListenerEndpointRegistry。listenerContainers的Map集合中。 在这里是还没有启动所有的监听程序。启动消息监听 RabbitListenerEndpointRegistry对象Bean实现了SmartLifecycle接口,所以容器上下文执行完(刷新完)以后会调用实现了该接口的会滴方法start,启动消息监听。 SpringBoot多数据源配置详解 SpringBoot邮件发送示例 Springboot面试题整理附答案 SpringBoot配置文件你了解多少? SpringBoot项目查看线上日志 springbootmybatisjpa实现读写分离 Springboot整合openfeign使用详解 SpringBootRabbitMQ消息可靠发送与接收 Springboot整合MyBatis复杂查询应用 Springboot整合RabbitMQ死信队列详解