城市直播房产教育博客汽车
投稿投诉
汽车报价
买车新车
博客专栏
专题精品
教育留学
高考读书
房产家居
彩票视频
直播黑猫
投资微博
城市上海
政务旅游

Springboot下的RabbitMQ消息监听源码解读

4月12日 夜如影投稿
  1自动配置1。1配置连接工厂publicclassRabbitAutoConfiguration{Configuration(proxyBeanMethodsfalse)ConditionalOnMissingBean(ConnectionFactory。class)protectedstaticclassRabbitConnectionFactoryCreator{BeanpublicCachingConnectionFactoryrabbitConnectionFactory(RabbitPropertiesproperties,ObjectProviderConnectionNameStrategyconnectionNameStrategy)throwsException{PropertyMappermapPropertyMapper。get();CachingConnectionFactoryfactorynewCachingConnectionFactory(getRabbitConnectionFactoryBean(properties)。getObject());设置属性值}privateRabbitConnectionFactoryBeangetRabbitConnectionFactoryBean(RabbitPropertiesproperties)throwsException{PropertyMappermapPropertyMapper。get();RabbitConnectionFactoryBeanfactorynewRabbitConnectionFactoryBean();。。。设置属性值factory。afterPropertiesSet();}}}1。2配置RabbitTemplatepublicclassRabbitAutoConfiguration{Configuration(proxyBeanMethodsfalse)Import(RabbitConnectionFactoryCreator。class)protectedstaticclassRabbitTemplateConfiguration{可自定义该类来自定义自己的实现BeanConditionalOnMissingBeanpublicRabbitTemplateConfigurerrabbitTemplateConfigurer(RabbitPropertiesproperties,ObjectProviderMessageConvertermessageConverter,ObjectProviderRabbitRetryTemplateCustomizerretryTemplateCustomizers){RabbitTemplateConfigurerconfigurernewRabbitTemplateConfigurer();。。。配置消息转换}BeanConditionalOnSingleCandidate(ConnectionFactory。class)ConditionalOnMissingBean(RabbitOperations。class)publicRabbitTemplaterabbitTemplate(RabbitTemplateConfigurerconfigurer,ConnectionFactoryconnectionFactory){RabbitTemplatetemplatenewRabbitTemplate();configurer。configure(template,connectionFactory);}}}
  以上配置比较简单,都是一些基本的配置,配置Rabbit的连接工厂,配置Template,客户端操作的模版RabbitTemplate对象。2监听配置2。1监听容器配置消息监听的核心就在导入的类中Import(RabbitAnnotationDrivenConfiguration。class)publicclassRabbitAutoConfiguration{}
  注解核心配置Configuration(proxyBeanMethodsfalse)ConditionalOnClass(EnableRabbit。class)classRabbitAnnotationDrivenConfiguration{privatefinalObjectProviderMessageConvertermessageCprivatefinalObjectProviderMessageRecoverermessageRprivatefinalObjectProviderRabbitRetryTemplateCustomizerretryTemplateCprivatefinalRabbitPRabbitAnnotationDrivenConfiguration(ObjectProviderMessageConvertermessageConverter,ObjectProviderMessageRecoverermessageRecoverer,ObjectProviderRabbitRetryTemplateCustomizerretryTemplateCustomizers,RabbitPropertiesproperties){this。messageConvertermessageCthis。messageRecoverermessageRthis。retryTemplateCustomizersretryTemplateCthis。}BeanConditionalOnMissingBeanSimpleRabbitListenerContainerFactoryConfigurersimpleRabbitListenerContainerFactoryConfigurer(){SimpleRabbitListenerContainerFactoryConfigurerconfigurernewSimpleRabbitListenerContainerFactoryConfigurer();configurer。setMessageConverter(this。messageConverter。getIfUnique());configurer。setMessageRecoverer(this。messageRecoverer。getIfUnique());configurer。setRetryTemplateCustomizers(this。retryTemplateCustomizers。orderedStream()。collect(Collectors。toList()));configurer。setRabbitProperties(this。properties);}Bean(namerabbitListenerContainerFactory)ConditionalOnMissingBean(namerabbitListenerContainerFactory)ConditionalOnProperty(prefixspring。rabbitmq。listener,nametype,havingValuesimple,matchIfMissingtrue)SimpleRabbitListenerContainerFactorysimpleRabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurerconfigurer,ConnectionFactoryconnectionFactory){SimpleRabbitListenerContainerFactoryfactorynewSimpleRabbitListenerContainerFactory();configurer。configure(factory,connectionFactory);}这里还有一个DirectRabbitListenerContainerFactory配置消息监听容器有两种一种是SIMPLE,还有一种是DIRECTConfiguration(proxyBeanMethodsfalse)该注解重点,是由它的Import来解析RabbitListener注解EnableRabbitConditionalOnMissingBean(nameRabbitListenerConfigUtils。RABBITLISTENERANNOTATIONPROCESSORBEANNAME)staticclassEnableRabbitConfiguration{}}publicfinalclassSimpleRabbitListenerContainerFactoryConfigurerextendsAbstractRabbitListenerContainerFactoryConfigurerSimpleRabbitListenerContainerFactory{Overridepublicvoidconfigure(SimpleRabbitListenerContainerFactoryfactory,ConnectionFactoryconnectionFactory){PropertyMappermapPropertyMapper。get();以下会为SimpleRabbitListenerContainerFactory工厂配置基础信息(通过配置文件设置的信息)RabbitProperties。SimpleContainerconfiggetRabbitProperties()。getListener()。getSimple();configure(factory,connectionFactory,config);设置并发信息map。from(config::getConcurrency)。whenNonNull()。to(factory::setConcurrentConsumers);map。from(config::getMaxConcurrency)。whenNonNull()。to(factory::setMaxConcurrentConsumers);map。from(config::getBatchSize)。whenNonNull()。to(factory::setBatchSize);}}2。2注册核心处理器
  主要注册一个BeanPostProcessor和RabbitListenerEndpointRegistry创建消息监听容器管理生命周期。Import(RabbitListenerConfigurationSelector。class)publicinterfaceEnableRabbit{}OrderpublicclassRabbitListenerConfigurationSelectorimplementsDeferredImportSelector{OverridepublicString〔〕selectImports(AnnotationMetadataimportingClassMetadata){注册如下Bean对象returnnewString〔〕{RabbitBootstrapConfiguration。class。getName()};}}
  RabbitBootstrapConfiguration。javapublicclassRabbitBootstrapConfigurationimplementsImportBeanDefinitionRegistrar{OverridepublicvoidregisterBeanDefinitions(AnnotationMetadataimportingClassMetadata,BeanDefinitionRegistryregistry){if(!registry。containsBeanDefinition(RabbitListenerConfigUtils。RABBITLISTENERANNOTATIONPROCESSORBEANNAME)){注册一个BeanPostProcessor对象,用来处理RabbitListener注解registry。registerBeanDefinition(RabbitListenerConfigUtils。RABBITLISTENERANNOTATIONPROCESSORBEANNAME,newRootBeanDefinition(RabbitListenerAnnotationBeanPostProcessor。class));}if(!registry。containsBeanDefinition(RabbitListenerConfigUtils。RABBITLISTENERENDPOINTREGISTRYBEANNAME)){注册消息监听容器MessageListenerContainer对象,同时还管理它们的生命周期MessageListenerContainer实现了SmartLifecycle接口(生命周期接口,在Spring中如果只是实现Lifecycle接口是没有作用的,只能实现SmartLifecycle接口)registry。registerBeanDefinition(RabbitListenerConfigUtils。RABBITLISTENERENDPOINTREGISTRYBEANNAME,newRootBeanDefinition(RabbitListenerEndpointRegistry。class));}}}2。3Bean后处理器publicclassRabbitListenerAnnotationBeanPostProcessorimplementsBeanPostProcessor,Ordered,BeanFactoryAware,BeanClassLoaderAware,EnvironmentAware,SmartInitializingSingleton{publicObjectpostProcessAfterInitialization(finalObjectbean,finalStringbeanName)throwsBeansException{获取目标类(如果是代理类)C?targetClassAopUtils。getTargetClass(bean);在targetClass类上及内部方法上查找RabbitListener注解finalTypeMetadatametadatathis。typeCache。computeIfAbsent(targetClass,this::buildMetadata);在buildMetadata方法中解析完后,开始遍历处理所有的监听方法for(ListenerMethodlm:metadata。listenerMethods){由于RabbitListener可以重复定义(一个方法上可有有多个该注解),所有这里是个数组for(RabbitListenerrabbitListener:lm。annotations){处理消息监听processAmqpListener(rabbitListener,lm。method,bean,beanName);}}if(metadata。handlerMethods。length0){processMultiMethodListeners(metadata。classAnnotations,metadata。handlerMethods,bean,beanName);}}privateTypeMetadatabuildMetadata(C?targetClass){CollectionRabbitListenerclassLevelListenersfindListenerAnnotations(targetClass);finalbooleanhasClassLevelListenersclassLevelListeners。size()0;finalListListenerMethodmethodsnewArrayList();finalListMethodmultiMethodsnewArrayList();遍历targetClass上的所有方法ReflectionUtils。doWithMethods(targetClass,method{当前Method上查找RabbitListener直接CollectionRabbitListenerlistenerAnnotationsfindListenerAnnotations(method);if(listenerAnnotations。size()0){如果方法上有该注解,则将当前的Method对象及所有的RabbitListener注释类封装到ListenerMethod中methods。add(newListenerMethod(method,listenerAnnotations。toArray(newRabbitListener〔listenerAnnotations。size()〕)));}如果targetClass类上有RabbitListener注解,那么会查找方法上是否有RabbitHandler注解注意:如果类上有RabbitListener注解,那么方法上必须使用RabbitHandler注解在这里我们不分析该种情况,我们只分析在方法上有RabbitListener注解的if(hasClassLevelListeners){RabbitHandlerrabbitHandlerAnnotationUtils。findAnnotation(method,RabbitHandler。class);if(rabbitHandler!null){multiMethods。add(method);}}},ReflectionUtils。USERDECLAREDMETHODS);if(methods。isEmpty()multiMethods。isEmpty()){returnTypeMetadata。EMPTY;}最后将解析出来的信息包装到TypeMetadata对象中returnnewTypeMetadata(methods。toArray(newListenerMethod〔methods。size()〕),multiMethods。toArray(newMethod〔multiMethods。size()〕),classLevelListeners。toArray(newRabbitListener〔classLevelListeners。size()〕));}}
  在上一步将所有的方法(方法上有RabbitListener注解的)解析处理后,接下来开始处理消息监听2。4构造消息监听端点publicclassRabbitListenerAnnotationBeanPostProcessor{privatefinalRabbitListenerEndpointRegistrarregistrarnewRabbitListenerEndpointRegistrar();publicObjectpostProcessAfterInitialization(。。。)throwsBeansException{processAmqpListener(rabbitListener,lm。method,bean,beanName);}protectedvoidprocessAmqpListener(RabbitListenerrabbitListener,Methodmethod,Objectbean,StringbeanName){MethodmethodToUsecheckProxy(method,bean);为每个监听的方法Method创建端点(执行消息处理程序)MethodRabbitListenerEndpointendpointnewMethodRabbitListenerEndpoint();endpoint。setMethod(methodToUse);处理监听程序processListener(endpoint,rabbitListener,bean,methodToUse,beanName);}protectedvoidprocessListener(MethodRabbitListenerEndpointendpoint,RabbitListenerrabbitListener,Objectbean,Objecttarget,StringbeanName){endpoint。setBean(bean);endpoint。setMessageHandlerMethodFactory(this。messageHandlerMethodFactory);如果RabbitListener没有配置id属性,那么将生成如下信息idorg。springframework。amqp。rabbit。RabbitListenerEndpointContainerthis。counter。getAndIncrement();endpoint。setId(getEndpointId(rabbitListener));设置监听的队列信息(RabbitListener中配置的queues)endpoint。setQueueNames(resolveQueues(rabbitListener));并发数设置,会覆盖配置文件中配置的endpoint。setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener。concurrency(),concurrency));endpoint。setBeanFactory(this。beanFactory);。。。解析如果配置了线程池,则在指定的线程池中运行resolveExecutor(endpoint,rabbitListener,target,beanName);。。。设置应答模式resolveAckMode(endpoint,rabbitListener);。。。获取消息监听容器工厂,这就是获取在2。1中配置的SIMPLE类型的监听容器工厂默认这里没有配置将会返回nullRabbitListenerContainerF?factoryresolveContainerFactory(rabbitListener,target,beanName);this。registrar。registerEndpoint(endpoint,factory);}}2。5构建消息监听
  接着上面构建完MethodRabbitListenerEndpoint对象后,将所有的监听方法保存publicclassRabbitListenerEndpointRegistrarimplementsBeanFactoryAware,InitializingBean{这里的factory是nullpublicvoidregisterEndpoint(RabbitListenerEndpointendpoint,NullableRabbitListenerContainerF?factory){factory,我们在实际创建容器之前推迟解决(这是源码中的注释说明)AmqpListenerEndpointDescriptordescriptornewAmqpListenerEndpointDescriptor(endpoint,factory);synchronized(this。endpointDescriptors){默认是falseif(this。startImmediately){Registerandstartimmediatelythis。endpointRegistry。registerListenerContainer(descriptor。endpoint,resolveContainerFactory(descriptor),true);}else{添加到集合中this。endpointDescriptors。add(descriptor);}}}}
  到此消息监听RabbitListener注解的方法就处理完成了,所有的监听方法都保存到了RabbitListenerAnnotationBeanPostProcessor。registrar。endpointDescriptors集合中。2。6注册消息监听
  RabbitListenerAnnotationBeanPostProcessor处理器程序实现了SmartInitializingSingleton接口,所以在所有的Bean创建完成以后会执行Bean实现了SmartInitializingSingletonafterSingletonsInstantiated的方法。publicclassRabbitListenerAnnotationBeanPostProcessorimplements。。。SmartInitializingSingleton{publicstaticfinalStringDEFAULTRABBITLISTENERCONTAINERFACTORYBEANNAMErabbitListenerContainerFprivateStringdefaultContainerFactoryBeanNameDEFAULTRABBITLISTENERCONTAINERFACTORYBEANNAME;OverridepublicvoidafterSingletonsInstantiated(){设置BeanFactory工厂this。registrar。setBeanFactory(this。beanFactory);if(this。beanFactoryinstanceofListableBeanFactory){。。。这里代码执行的集合会为空}这里默认会为null,因为registrar是直接new并没有设置if(this。registrar。getEndpointRegistry()null){if(this。endpointRegistrynull){从容器中查找,在2。1中注册的RabbitListenerEndpointRegistry对象this。endpointRegistrythis。beanFactory。getBean(RabbitListenerConfigUtils。RABBITLISTENERENDPOINTREGISTRYBEANNAME,RabbitListenerEndpointRegistry。class);}设置this。registrar。setEndpointRegistry(this。endpointRegistry);}设置默认的容器工厂BeanNameif(this。defaultContainerFactoryBeanName!null){this。registrar。setContainerFactoryBeanName(this。defaultContainerFactoryBeanName);}。。。这里注册所有的监听程序this。registrar。afterPropertiesSet();清空缓存this。typeCache。clear();}}
  注册监听程序publicclassRabbitListenerEndpointRegistrarimplementsBeanFactoryAware,InitializingBean{publicvoidafterPropertiesSet(){registerAllEndpoints();}protectedvoidregisterAllEndpoints(){synchronized(this。endpointDescriptors){变量在2。5中构建的所有监听方法for(AmqpListenerEndpointDescriptordescriptor:this。endpointDescriptors){注册消息监听容器;注意这里resolveContainerFactory方法会获取容器中在2。1中创建的容器工厂this。endpointRegistry。registerListenerContainer(descriptor。endpoint,resolveContainerFactory(descriptor));}触发立即启动this。startI}}privateRabbitListenerContainerF?resolveContainerFactory(AmqpListenerEndpointDescriptordescriptor){if(descriptor。containerFactory!null){returndescriptor。containerF}elseif(this。containerFactory!null){returnthis。containerF}elseif(this。containerFactoryBeanName!null){直接进入到此处获取容器在2。1中创建的SimpleRabbitListenerContainerFactorythis。containerFactorythis。beanFactory。getBean(this。containerFactoryBeanName,RabbitListenerContainerFactory。class);returnthis。containerF}}}
  注册监听容器publicclassRabbitListenerEndpointRegistryimplementsDisposableBean,SmartLifecycle,ApplicationContextAware,ApplicationListenerContextRefreshedEvent{publicvoidregisterListenerContainer(RabbitListenerEndpointendpoint,RabbitListenerContainerF?factory){registerListenerContainer(endpoint,factory,false);}publicvoidregisterListenerContainer(RabbitListenerEndpointendpoint,RabbitListenerContainerF?factory,booleanstartImmediately){获取关于id值已经在2。4中有说明Stringidendpoint。getId();synchronized(this。listenerContainers){根据在2。4中构造的消息监听端点创建消息监听容器这里的factorySimpleRabbitListenerContainerF在2。1中创建下面的步骤创建消息监听容器MessageListenerContainercontainercreateListenerContainer(endpoint,factory);this。listenerContainers。put(id,container);if(StringUtils。hasText(endpoint。getGroup())this。applicationContext!null){ListMessageListenerContainercontainerGif(this。applicationContext。containsBean(endpoint。getGroup())){containerGroupthis。applicationContext。getBean(endpoint。getGroup(),List。class);}else{containerGroupnewArrayListMessageListenerContainer();this。applicationContext。getBeanFactory()。registerSingleton(endpoint。getGroup(),containerGroup);}containerGroup。add(container);}if(this。contextRefreshed){container。lazyLoad();}if(startImmediately){startIfNecessary(container);}}}protectedMessageListenerContainercreateListenerContainer(RabbitListenerEndpointendpoint,RabbitListenerContainerF?factory){通过SimpleRabbitListenerContainerFactory创建监听容器调用父类AbstractRabbitListenerContainerFactorycreateListenerContainer方法SimpleRabbitListenerContainerFactory在2。1中创建,并且设置了配置文件中配置的相关参数信息MessageListenerContainerlistenerContainerfactory。createListenerContainer(endpoint);if(listenerContainerinstanceofInitializingBean){try{执行初始化方法((InitializingBean)listenerContainer)。afterPropertiesSet();}。。。}intcontainerPhaselistenerContainer。getPhase();if(containerPhaseInteger。MAXVALUE){acustomphasevalueif(this。phaseInteger。MAXVALUEthis。phase!containerPhase){throwexception}this。phaselistenerContainer。getPhase();}returnlistenerC}}
  监听容器工厂父类创建监听容器publicabstractclassAbstractRabbitListenerContainerFactory{publicCcreateListenerContainer(RabbitListenerEndpointendpoint){调用子类SimpleRabbitListenerContainerFactory方法如下创建的SimpleMessageListenerContainer对象CinstancecreateContainerInstance();JavaUtilsjavaUtilsJavaUtils。INSTANCE。acceptIfNotNull(this。connectionFactory,instance::setConnectionFactory)。acceptIfNotNull(this。errorHandler,instance::setErrorHandler);if(this。messageConverter!nullendpoint!null){endpoint。setMessageConverter(this。messageConverter);}javaUtils。acceptIfNotNull(this。acknowledgeMode,instance::setAcknowledgeMode)。。。if(this。batchListenerthis。deBatchingEnablednull){turnoffcontainerdebatchingbydefaultforbatchlistenersinstance。setDeBatchingEnabled(false);}覆盖默认工厂的参数信息if(endpoint!null){endpointsettingsoverridingdefaultfactorysettingsjavaUtils。acceptIfNotNull(endpoint。getAutoStartup(),instance::setAutoStartup)。acceptIfNotNull(endpoint。getTaskExecutor(),instance::setTaskExecutor)。acceptIfNotNull(endpoint。getAckMode(),instance::setAcknowledgeMode);javaUtils。acceptIfNotNull(this。batchingStrategy,endpoint::setBatchingStrategy);instance。setListenerId(endpoint。getId());endpoint。setBatchListener(this。batchListener);endpoint。setupListenerContainer(instance);}。。。initializeContainer(instance,endpoint);if(this。containerCustomizer!null){this。containerCustomizer。configure(instance);}}}publicclassSimpleRabbitListenerContainerFactory{protectedSimpleMessageListenerContainercreateContainerInstance(){returnnewSimpleMessageListenerContainer();}protectedvoidinitializeContainer(SimpleMessageListenerContainerinstance,RabbitListenerEndpointendpoint){super。initializeContainer(instance,endpoint);JavaUtilsjavaUtilsJavaUtils。INSTANCE。acceptIfNotNull(this。batchSize,instance::setBatchSize);Sif(endpoint!null){concurrencyendpoint。getConcurrency();javaUtils。acceptIfNotNull(concurrency,instance::setConcurrency);}javaUtils。acceptIfCondition(concurrencynullthis。concurrentConsumers!null,this。concurrentConsumers,instance::setConcurrentConsumers)。acceptIfCondition((concurrencynull!(concurrency。contains()))this。maxConcurrentConsumers!null,this。maxConcurrentConsumers,instance::setMaxConcurrentConsumers)。acceptIfNotNull(this。startConsumerMinInterval,instance::setStartConsumerMinInterval)。acceptIfNotNull(this。stopConsumerMinInterval,instance::setStopConsumerMinInterval)。acceptIfNotNull(this。consecutiveActiveTrigger,instance::setConsecutiveActiveTrigger)。acceptIfNotNull(this。consecutiveIdleTrigger,instance::setConsecutiveIdleTrigger)。acceptIfNotNull(this。receiveTimeout,instance::setReceiveTimeout);if(Boolean。TRUE。equals(this。consumerBatchEnabled)){instance。setConsumerBatchEnabled(true);instance。setDeBatchingEnabled(true);}}}
  到此消息监听容器MessageListenerContainer(SimpleMessageListenerContainer)对象创建完成,
  到这里主要的消息监听容器都创建完成后接下来就是启动消息监听容器了。3启动消息监听
  在2。2中注册了RabbitListenerEndpointRegistry对象,该类实现了SmartLifecycle接口,也实现了ApplicationListener接口,并且处理的是ContextRefreshedEvent事件。
  上面这两个动作都会在容器上下文初始化完成以后触发,在AbstractApplicationContextrefreshfinishRefresh方法中触发publicabstractclassAbstractApplicationContext{protectedvoidfinishRefresh(){Initializelifecycleprocessorforthiscontext。初始化生命周期处理器类,默认为:DefaultLifecycleProcessorinitLifecycleProcessor();Propagaterefreshtolifecycleprocessorfirst。处理生命周期方法getLifecycleProcessor()。onRefresh();Publishthefinalevent。发布事件publishEvent(newContextRefreshedEvent(this));}}
  声明周期执行publicclassDefaultLifecycleProcessorimplementsLifecycleProcessor,BeanFactoryAware{publicvoidonRefresh(){startBeans(true);this。}执行所有的SmartLifecyclestart方法privatevoidstartBeans(booleanautoStartupOnly){MapString,LifecyclelifecycleBeansgetLifecycleBeans();MapInteger,LifecycleGroupphasesnewTreeMap();lifecycleBeans。forEach((beanName,bean){if(!autoStartupOnly(beaninstanceofSmartLifecycle((SmartLifecycle)bean)。isAutoStartup())){intphasegetPhase(bean);phases。computeIfAbsent(phase,pnewLifecycleGroup(phase,this。timeoutPerShutdownPhase,lifecycleBeans,autoStartupOnly))。add(beanName,bean);}});if(!phases。isEmpty()){phases。values()。forEach(LifecycleGroup::start);}}}
  开始消息监听publicclassRabbitListenerEndpointRegistryimplementsDisposableBean,SmartLifecycle,ApplicationContextAware,ApplicationListenerContextRefreshedEvent{publicvoidstart(){for(MessageListenerContainerlistenerContainer:getListenerContainers()){startIfNecessary(listenerContainer);}}listenerContainerSimpleMessageListenerContainerprivatevoidstartIfNecessary(MessageListenerContainerlistenerContainer){if(this。contextRefreshedlistenerContainer。isAutoStartup()){启动调用父类的start方法listenerContainer。start();}}}publicabstractclassAbstractMessageListenerContainer{publicvoidstart(){。。。该方法在子类中实现doStart();。。。}}publicclassSimpleMessageListenerContainerextendsAbstractMessageListenerContainer{父类中的属性privateExecutortaskExecutornewSimpleAsyncTaskExecutor();protectedvoiddoStart(){intnewConsumersinitializeConsumers();synchronized(this。consumersMonitor){intnewConsumersinitializeConsumers();。。。SetprocessorsnewHashSet();变量消费者for(BlockingQueueConsumerconsumer:this。consumers){AsyncMessageProcessingConsumer实现了Runable接口核心消息的处理就在该Consumer中进行AsyncMessageProcessingConsumerprocessornewAsyncMessageProcessingConsumer(consumer);processors。add(processor);获取线程池提交任务Runnable对象线程池对象是系统默认的SimpleAsyncTaskExecutorgetTaskExecutor()。execute(processor);if(getApplicationEventPublisher()!null){getApplicationEventPublisher()。publishEvent(newAsyncConsumerStartedEvent(this,consumer));}}waitForConsumersToStart(processors);}}protectedintinitializeConsumers(){intcount0;synchronized(this。consumersMonitor){if(this。consumersnull){this。cancellationLock。reset();根据并发数创建阻塞队列(有几个并发消费者,就创建几个阻塞队列消费者(内部使用的LinkedBlockingQueue队列))this。consumersnewHashSetBlockingQueueConsumer(this。concurrentConsumers);for(inti0;ithis。concurrentCi){创建阻塞队列消费者BlockingQueueConsumerconsumercreateBlockingQueueConsumer();this。consumers。add(consumer);}}}}protectedBlockingQueueConsumercreateBlockingQueueConsumer(){BlockingQueueCString〔〕queuesgetQueueNames();实际就是配置的prefetch属性值,该值的取值0到65535,如果是0表示没有限制该prefetch参数与Qos绑定(服务质量)channel。basicQos(this。prefetchCount,。。。)intactualPrefetchCountgetPrefetchCount()this。batchSize?getPrefetchCount():this。batchSconsumernewBlockingQueueConsumer(getConnectionFactory(),getMessagePropertiesConverter(),this。cancellationLock,getAcknowledgeMode(),isChannelTransacted(),actualPrefetchCount,isDefaultRequeueRejected(),getConsumerArguments(),isNoLocal(),isExclusive(),queues);consumer。setGlobalQos(isGlobalQos());consumer。setMissingQueuePublisher(this::publishMissingQueueEvent);if(this。declarationRetries!null){consumer。setDeclarationRetries(this。declarationRetries);}if(getFailedDeclarationRetryInterval()0){consumer。setFailedDeclarationRetryInterval(getFailedDeclarationRetryInterval());}if(this。retryDeclarationInterval!null){consumer。setRetryDeclarationInterval(this。retryDeclarationInterval);}ConsumerTagStrategyconsumerTagStrategygetConsumerTagStrategy();if(consumerTagStrategy!null){consumer。setTagStrategy(consumerTagStrategy);}consumer。setBackOffExecution(getRecoveryBackOff()。start());consumer。setShutdownTimeout(getShutdownTimeout());consumer。setApplicationEventPublisher(getApplicationEventPublisher());}}
  异步消息处理消费者AsyncMessageProcessingConsumerprivatefinalclassAsyncMessageProcessingConsumerimplementsRunnable{publicvoidrun(){。。。核心方法try{该方法中主要就是从RabbitBroker上获取消息存入到消息队列中1。首先是设置设置Qos该值对应的是配置中的prefetch,channel。basicQos(this。prefetchCount)2。消费获取消息内部定义了InternalConsumer消费者重写了handleDelivery方法在该方法中会将获取到的消息包装到Delivery对象中然后调用BlockingQueueoffer方法存入队列中,offer如果存入成功返回true,失败返回false。initialize();while(isActive(this。consumer)this。consumer。hasDelivery()!this。consumer。cancelled()){从队列中不停的拿消息mainLoop();}}。。。}}
  事件处理publicclassRabbitListenerEndpointRegistryimplementsDisposableBean,SmartLifecycle,ApplicationContextAware,ApplicationListenerContextRefreshedEvent{privatebooleancontextRpublicvoidonApplicationEvent(ContextRefreshedEventevent){if(event。getApplicationContext()。equals(this。applicationContext)){当收到事件后设置为true表示上下文已经初始化完成了this。contextR}}}4总结流程开启监听功能
  RabbitAutoConfiguration》RabbitAnnotationDrivenConfiguration》EnableRabbitConfiguration》EnableRabbit注册BeanPostProcessor
  注册RabbitListenerAnnotationBeanPostProcessor处理器处理RabbitListener和RabbitHandler注解构造Endpoint
  RabbitListenerAnnotationBeanPostProcessor类
  将上一步解析出来的所有方法及对应的RabbitListener注解中配置的信息进行包装到MethodRabbitListenerEndpoint中
  说明:RabbitListener注解中的errorHandler属性可以是SpEL表达式也可以是一个Bean的名称
  该步骤中主要就是设置相关的一些属性信息到Endpoint中,比如:ackMode,queueName,concurrency等信息。
  构造完Endpoint对象后将其保存到RabbitListenerEndpointRegistrar中。注册Endpoint
  RabbitListenerAnnotationBeanPostProcessor类实现了SmartInitializingSingleton接口,当所有的Bean初始化完成以后会执行实现了SmartInitializingSingleton接口Bean的回调方法afterSingletonsInstantiated。
  在afterSingletonsInstantiated方法中调用RabbitListenerAnnotationBeanPostProcessor。registrar(RabbitListenerEndpointRegistrar)afterPropertiesSet
  方法。
  在afterPropertiesSet方法中就是注册Endpoint了,在该方法中将所有的Endpoint再封装成MessageListenerContainer(SimpleMessageListenerContainer)
  对象,最后将MessageListenerContainer对象保存到RabbitListenerEndpointRegistry。listenerContainers的Map集合中。
  在这里是还没有启动所有的监听程序。启动消息监听
  RabbitListenerEndpointRegistry对象Bean实现了SmartLifecycle接口,所以容器上下文执行完(刷新完)以后会调用实现了该接口的会滴方法start,启动消息监听。
  SpringBoot多数据源配置详解
  SpringBoot邮件发送示例
  Springboot面试题整理附答案
  SpringBoot配置文件你了解多少?
  SpringBoot项目查看线上日志
  springbootmybatisjpa实现读写分离
  Springboot整合openfeign使用详解
  SpringBootRabbitMQ消息可靠发送与接收
  Springboot整合MyBatis复杂查询应用
  Springboot整合RabbitMQ死信队列详解
投诉 评论 转载

格力空调和美的空调哪个质量好?有对比更显高性价比目前市场中的空调品牌众多,不过真正称得上优质的,格力空调和美的空调可以说是名列其中。只不过对于大部分朋友来说,可能对于这两款空调的具体情况还不是很清楚,也不知道具体该选择哪一款……新能源电力市场交易和海上风电投资机会解读纪要声明:本材料仅为公开资料整理,不涉及投资建议。记录或者翻译可能产生误差,仅供参考,如有异议,请联系删除。纪要核心:新能源电力市场交易:电力交易统一市场指导意见有利于……一个差评扣200到300元!现在改了!美团外卖试点骑手柔性考之前,外卖骑手收到差评将被处罚,现在改了!美团外卖试点骑手柔性考评骑手收差评从罚款改扣分此前,外卖骑手收差评将被处罚的规定,让骑手和消费者之间的关……财经辣评用户过亿的美图,摆脱广告依赖真能有戏吗文羊城晚报财经评论员戚耀琪3月30日,美图发布了2021年年度业绩,购买以太币的公允价值增加4。256亿元成为亮点之一,尽管美图秀秀的用户过亿,美图公司却长期被资本市场冷……回收系统开发回收废品在许多人眼中是一个又累又脏的职业,但现在还是有一些年轻人选择做这个行业。随着互联网的兴起,废品回收行业也插上了互联网的翅膀,让可回收物品能够实现循环再利用。有互联……开发一个APP所需要的成本现代人由于忙碌的工作,很少进行社交,手机成为人与人之间链接的重要工具。微信、微博等主流社交APP垄断了大量的社交人员和渠道,但大众的需求永远存在,走正确的路线、场景应用的垂直社……我今天才下载的今日头条,请问你们觉得抖音好些还是今日头条好呢从老头我本人的角度和认识度来说,本老头我觉得今日头条好。为什么本老头我这样认为呢?因为本老头我对什么拍抖音、拍视频、弄直播等其它五花八门的自媒体,十窍中零窍都不通!至于今……从名医主刀到名医主导,互联网医疗进入4。0时代熟悉互联网医疗的人士对名医主刀这个名字大概不陌生,但外界对它的印象多停留在帮医生离开执业地点去异地为患者开展手术的飞刀模式阶段。经过数年的发展,名医主刀已悄然迭代到4。0版本,……家用电脑硬盘,选蓝盘还是黑盘,哪个好用稳定?大家好!我是大明、所谓的蓝盘,黑盘,其实是西部数据所生产的机械硬盘,性能也是不一样的,黑盘多用于服务器或商用电脑读写速度快一些,蓝盘多用于家用电脑,读写速度也很好,价位低一些,……大家帮推荐一下几款性价比高的,DAC解码器?大家帮推荐一下几款性价比高的,DAC解码器吗?功放前级。连接电脑和功放的DAC解码器,最好有USB插孔的,请问有哪些性价比高的机器推荐?价格800元以内的发烧友们都……助听器会影响老人身体健康吗?老人带听助听器没有坏处,但是要定期到医院检测听力,佩戴适合自己的助听器。如果老人出现听力减退,是可以使用助听器的,它可以更好的提高生活的质量。不过在选择助听器时一定要慎重……Springboot下的RabbitMQ消息监听源码解读1自动配置1。1配置连接工厂publicclassRabbitAutoConfiguration{Configuration(proxyBeanMethodsfalse)Con……
未来两年,苹果或将发布两款新iPhoneSE618提前到来,不知道买什么手机?我来告诉你连汉服小姐姐都被圈粉?绿厂这款Reno6星黛紫究竟什么来路三星拉响了警报声Java基础语法数组和方法工信部我国已建成5G基站近160万个库克赢麻了,5款iPhone续航表现均不错,第一名超过16个央媒替华为打广告?超过2。2亿台设备安装鸿蒙,想取缔Wind2021。12。18最新电脑配置推荐与方向未来科技的五大发展趋势巴以冲突中90的火箭炮被拦截,火箭炮难道这么不堪一击?公司倒闭女友分手负债百万,最惨共享单车创始人告诉你什么叫九死
倾听生命荣耀X10核心配置、证件照曝光:麒麟820、主摄配IMX60盛气凌人的近义词反义词及造句1996年,唐灵生为给中国夺下一块金牌,举重苦撑12秒,伟大孕晚期每天需要摄入的营养日本最美女优十大最好看的女星排行白领九招有效防电脑辐射本周大新闻索尼PSVR2体验首次公开MetaQuestPro国足7球大胜关岛,武磊阿兰皆双响,请问谁更优秀?字典我想对你说微型实验在初中化学教学中的应用华为商城开卖认证二手机P302799元起

友情链接:中准网聚热点快百科快传网快生活快软网快好知文好找江西南阳嘉兴昆明铜陵滨州广东西昌常德梅州兰州阳江运城金华广西萍乡大理重庆诸暨泉州安庆南充武汉辽宁