一、应用启动的时候如何从配置中心拉取配置文件 通过上一节的介绍,我们已经知道了配置加载的扩展点。下面我们已具体的Nacos配置中心来进行说明。 NacosConfigBootstrapConfiguration是BootstrapConfiguration的配置类,在bootstrap的SpringApplication创建的过程中,会加载这个类。这个Configuration类包括两个Bean,分别是NacosConfigManager,NacosPropertySourceLocator。 NacosConfigManager的核心作用是创建NacosConfigService,通过NacosConfigService从远程配置中心拉取配置。privateStringgetConfigInner(Stringtenant,StringdataId,Stringgroup,longtimeoutMs)throwsNacosException{groupnull2defaultGroup(group);ParamUtils。checkKeyParam(dataId,group);ConfigResponsecrnewConfigResponse();cr。setDataId(dataId);cr。setTenant(tenant);cr。setGroup(group);省略一些代码try{String〔〕ctworker。getServerConfig(dataId,group,tenant,timeoutMs);cr。setContent(ct〔0〕);configFilterChainManager。doFilter(null,cr);contentcr。getContent();}catch(NacosExceptionioe){}contentcr。getContent();} NacosPropertySourceLocator是PropertySourceLocator的实现类,上一节已经详细介绍过PropertySourceLocator。OverridepublicPropertyS?locate(Environmentenv){nacosConfigProperties。setEnvironment(env);ConfigServiceconfigServicenacosConfigManager。getConfigService();省略一些代码nacosPropertySourceBuildernewNacosPropertySourceBuilder(configService,timeout);CompositePropertySourcecompositenewCompositePropertySource(NACOSPROPERTYSOURCENAME);loadSharedConfiguration(composite);loadExtConfiguration(composite);loadApplicationConfiguration(composite,dataIdPrefix,nacosConfigProperties,env);}二、配置中心的配置变动时,如何通知到应用 NacosConfigAutoConfiguration是EnableAutoConfiguration的配置类,当application创建SpringApplication的过程中会被加载,会加载两个重要的Bean,NacosContextRefresher和NacosConfigManager,NacosConfigManager上面已经结束过。 NacosContextRefresher主要作用就是注册Nacos监听器。OverridepublicvoidonApplicationEvent(ApplicationReadyEventevent){manySpringcontextif(this。ready。compareAndSet(false,true)){this。registerNacosListenersForApplications();}}privatevoidregisterNacosListener(finalStringgroupKey,finalStringdataKey){StringkeyNacosPropertySourceRepository。getMapKey(dataKey,groupKey);ListenerlistenerlistenerMap。computeIfAbsent(key,lstnewAbstractSharedListener(){OverridepublicvoidinnerReceive(StringdataId,Stringgroup,StringconfigInfo){refreshCountIncrement();nacosRefreshHistory。addRefreshRecord(dataId,group,configInfo);todofeature:supportsinglerefreshforlistening当配置文件变动时,会发布RefreshEvent事情applicationContext。publishEvent(newRefreshEvent(this,null,RefreshNacosconfig));if(log。isDebugEnabled()){log。debug(String。format(RefreshNacosconfiggroups,dataIds,configInfos,group,dataId,configInfo));}}});try{注册监听器configService。addListener(dataKey,groupKey,listener);}catch(NacosExceptione){}} 监听器又是如何触发的? Nacos使用长轮询方式获取配置,判断文件是否有变动,如果有变动则触发监听器。 NacosConfigService创建的时候,会创建ClientWorker对象,同时会创建长轮询任务。publicNacosConfigService(Propertiesproperties)throwsNacosException{省略其他代码this。workernewClientWorker(this。agent,this。configFilterChainManager,properties);}publicClientWorker(finalHttpAgentagent,finalConfigFilterChainManagerconfigFilterChainManager,finalPropertiesproperties){初始化一个延迟任务,10毫秒后调用checkConfigInfothis。executor。scheduleWithFixedDelay(newRunnable(){Overridepublicvoidrun(){try{checkConfigInfo();}catch(Throwablee){LOGGER。error(〔agent。getName()〕〔subcheck〕rotatecheckerror,e);}}},1L,10L,TimeUnit。MILLISECONDS);}publicvoidcheckConfigInfo(){Dispatchtaskes。intlistenerSizecacheMap。get()。size();RoundupthelongingTaskCount。intlongingTaskCount(int)Math。ceil(listenerSizeParamUtil。getPerTaskConfigSize());if(longingTaskCountcurrentLongingTaskCount){for(inti(int)currentLongingTaskCilongingTaskCi){Thetasklistisnoorder。Soitmaybehasissueswhenchanging。executorService。execute(newLongPollingRunnable(i));}currentLongingTaskCountlongingTaskC}} 我们继续往下看LongPollingRunnableOverridepublicvoidrun(){省略其他代码checkserverconfig从服务器询问缓存的文件是否有变动ListStringchangedGroupKeyscheckUpdateDataIds(cacheDatas,inInitializingCacheList);if(!CollectionUtils。isEmpty(changedGroupKeys)){LOGGER。info(getchangedGroupKeys:changedGroupKeys);}}}ListStringcheckUpdateDataIds(ListCacheDatacacheDatas,ListStringinInitializingCacheList)throwsException{StringBuildersbnewStringBuilder();for(CacheDatacacheData:cacheDatas){if(!cacheData。isUseLocalConfigInfo()){sb。append(cacheData。dataId)。append(WORDSEPARATOR);sb。append(cacheData。group)。append(WORDSEPARATOR);if(StringUtils。isBlank(cacheData。tenant)){sb。append(cacheData。getMd5())。append(LINESEPARATOR);}else{sb。append(cacheData。getMd5())。append(WORDSEPARATOR);sb。append(cacheData。getTenant())。append(LINESEPARATOR);}if(cacheData。isInitializing()){ItupdateswhencacheDataoccoursincacheMapbyfirsttime。inInitializingCacheList。add(GroupKey。getKeyTenant(cacheData。dataId,cacheData。group,cacheData。tenant));}}}booleanisInitializingCacheList!inInitializingCacheList。isEmpty();returncheckUpdateConfigStr(sb。toString(),isInitializingCacheList);}ListStringcheckUpdateConfigStr(StringprobeUpdateString,booleanisInitializingCacheList)throwsException{MapString,StringparamsnewHashMapString,String(2);params。put(Constants。PROBEMODIFYREQUEST,probeUpdateString);MapString,StringheadersnewHashMapString,String(2);headers。put(LongPullingTimeout,timeout);try{Inordertopreventtheserverfromhandlingthedelayoftheclientslongtask,increasetheclientsreadtimeouttoavoidthisproblem。询问服务器,配置是否有变动,这里客户端读超时时间需要设置长一点longreadTimeoutMstimeout(long)Math。round(timeout1);HttpRestResultStringresultagent。httpPost(Constants。CONFIGCONTROLLERPATHlistener,headers,params,agent。getEncode(),readTimeoutMs);if(result。ok()){setHealthServer(true);returnparseUpdateDataIdResponse(result。getData());}else{setHealthServer(false);}}catch(Exceptione){setHealthServer(false);}returnCollections。emptyList();}Overridepublicvoidrun(){try{checkserverconfigListStringchangedGroupKeyscheckUpdateDataIds(cacheDatas,inInitializingCacheList);if(!CollectionUtils。isEmpty(changedGroupKeys)){LOGGER。info(getchangedGroupKeys:changedGroupKeys);}如果服务端文件有变动,则从服务端重新拉取配置for(StringgroupKey:changedGroupKeys){try{String〔〕ctgetServerConfig(dataId,group,tenant,3000L);CacheDatacachecacheMap。get()。get(GroupKey。getKeyTenant(dataId,group,tenant));cache。setContent(ct〔0〕);if(null!ct〔1〕){cache。setType(ct〔1〕);}}catch(NacosExceptionioe){}}for(CacheDatacacheData:cacheDatas){if(!cacheData。isInitializing()inInitializingCacheList。contains(GroupKey。getKeyTenant(cacheData。dataId,cacheData。group,cacheData。tenant))){如果文件MD5不一致,则触发监听器cacheData。checkListenerMd5();cacheData。setInitializing(false);}}inInitializingCacheList。clear();executorService。execute(this);}catch(Throwablee){executorService。schedule(this,taskPenaltyTime,TimeUnit。MILLISECONDS);}} 如果文件MD5不一致,则触发监听器voidcheckListenerMd5(){for(ManagerListenerWrapwrap:listeners){if(!md5。equals(wrap。lastCallMd5)){safeNotifyListener(dataId,group,content,type,md5,wrap);}}}privatevoidsafeNotifyListener(finalStringdataId,finalStringgroup,finalStringcontent,finalStringtype,finalStringmd5,finalManagerListenerWraplistenerWrap){finalListenerlistenerlistenerWrap。RunnablejobnewRunnable(){Overridepublicvoidrun(){try{ConfigResponsecrnewConfigResponse();cr。setDataId(dataId);cr。setGroup(group);cr。setContent(content);configFilterChainManager。doFilter(null,cr);StringcontentTmpcr。getContent();回调监听器listener。receiveConfigInfo(contentTmp);listenerWrap。lastCallMd5md5;}catch(NacosExceptionex){}}};try{if(null!listener。getExecutor()){listener。getExecutor()。execute(job);}else{job。run();}}catch(Throwablet){}} 最终实现回调publicvoidinnerReceive(StringdataId,Stringgroup,StringconfigInfo){refreshCountIncrement();nacosRefreshHistory。addRefreshRecord(dataId,group,configInfo);todofeature:supportsinglerefreshforlistening发布RefreshEvent事件applicationContext。publishEvent(newRefreshEvent(this,null,RefreshNacosconfig));if(log。isDebugEnabled()){log。debug(String。format(RefreshNacosconfiggroups,dataIds,configInfos,group,dataId,configInfo));}}三、总结 判断文件是否变动的核心是长轮询,客户端比较简单,只需要设置较长时间的读超时即可。后面我们会继续探究Nacos服务端的长轮询是如何时间的。