nacos配置自动刷新源码解析

文章目录

  • 一、前言
  • 二、源码解析
    • 1、nacos客户端如何监听服务端配置变化的
    • 2、@ConfigurationProperties注解的bean是如何自动刷新的
    • 3、@RefreshScope 注解的bean是如何自动刷新的
  • 三、总结

一、前言

最近好奇 nacos 是怎么做到配置自动刷新的,于是就去debug跟了下源码,版本是 nacos2.2.1

二、源码解析

1、nacos客户端如何监听服务端配置变化的

我改动了一下 nacos 上的配置,发现客户端打印了如下日志:

image-20240206171338618

在日志中发现 ClientWorker 这个类,于是开始 debug,首先看 ClientWorker 的构造函数

//ClientWorker
@SuppressWarnings("PMD.ThreadPoolCreationRule")
public ClientWorker(final ConfigFilterChainManager configFilterChainManager, ServerListManager serverListManager,final NacosClientProperties properties) throws NacosException {this.configFilterChainManager = configFilterChainManager;//初始化一些参数init(properties);agent = new ConfigRpcTransportClient(properties, serverListManager);int count = ThreadUtils.getSuitableThreadCount(THREAD_MULTIPLE);ScheduledExecutorService executorService = Executors.newScheduledThreadPool(Math.max(count, MIN_THREAD_NUM),r -> {Thread t = new Thread(r);t.setName("com.alibaba.nacos.client.Worker");t.setDaemon(true);return t;});agent.setExecutor(executorService);agent.start();}

跟进 agent.start()

//ConfigRpcTransportClient
public void start() throws NacosException {securityProxy.login(this.properties);this.executor.scheduleWithFixedDelay(() -> securityProxy.login(properties), 0,this.securityInfoRefreshIntervalMills, TimeUnit.MILLISECONDS);startInternal();
}

继续跟进 startInternal() ,进到子类 ConfigRpcTransportClient (在 ClientWorker 内部 )

//ConfigRpcTransportClient
@Override
public void startInternal() {executor.schedule(() -> {while (!executor.isShutdown() && !executor.isTerminated()) {try {//如果阻塞队列里有内容,就直接过,没有的话,就等待5秒listenExecutebell.poll(5L, TimeUnit.SECONDS);if (executor.isShutdown() || executor.isTerminated()) {continue;}executeConfigListen();} catch (Throwable e) {LOGGER.error("[ rpc listen execute ] [rpc listen] exception", e);}}}, 0L, TimeUnit.MILLISECONDS);}

继续跟进 executeConfigListen() ,这是核心方法

//ClientWorker
@Override
public void executeConfigListen() {//创建listenCachesMap、removeListenCachesMap分别用于存放非销毁的缓存和销毁的cacheMap<String, List<CacheData>> listenCachesMap = new HashMap<>(16);Map<String, List<CacheData>> removeListenCachesMap = new HashMap<>(16);//获取当前时间戳long now = System.currentTimeMillis();//通过当前时间和上次全同步时间间隔是不是大于5分钟来决定这次是不是要全同步,ALL_SYNC_INTERNAL是5分钟boolean needAllSync = now - lastAllSyncTime >= ALL_SYNC_INTERNAL;for (CacheData cache : cacheMap.get().values()) {synchronized (cache) {//check local listeners consistent.//这个SyncWithServer在3个情况下是false,如果是false就意味着不会走里面的continue了,下面的检查都会执行//1.添加listener.默认为false//2.接收配置更改通知,设置为false//3.listener被移除,设置为falseif (cache.isSyncWithServer()) {cache.checkListenerMd5();if (!needAllSync) {//如果不是全部同步,就直接跳过,那么listenCachesMap、removeListenCachesMap都会是空的continue;}}if (!cache.isDiscard()) {//如果cache是非丢弃的,根据taskId分装到listenCachesMap里//get listen  configif (!cache.isUseLocalConfigInfo()) {List<CacheData> cacheDatas = listenCachesMap.get(String.valueOf(cache.getTaskId()));if (cacheDatas == null) {cacheDatas = new LinkedList<>();listenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);}cacheDatas.add(cache);}} else if (cache.isDiscard()) {//如果cache是丢弃的,根据taskId分装到removeListenCachesMap里if (!cache.isUseLocalConfigInfo()) {List<CacheData> cacheDatas = removeListenCachesMap.get(String.valueOf(cache.getTaskId()));if (cacheDatas == null) {cacheDatas = new LinkedList<>();removeListenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);}cacheDatas.add(cache);}}}}//记录是否有变化的keyboolean hasChangedKeys = false;//listenCachesMap不为空,走这里if (!listenCachesMap.isEmpty()) {for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) {String taskId = entry.getKey();//用于记录上次更新的时间Map<String, Long> timestampMap = new HashMap<>(listenCachesMap.size() * 2);List<CacheData> listenCaches = entry.getValue();for (CacheData cacheData : listenCaches) {//循环每个cache,记录上次更新的时间到timestampMap里timestampMap.put(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant),cacheData.getLastModifiedTs().longValue());}//把每个cache的md5值发送到服务端ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(listenCaches);configChangeListenRequest.setListen(true);try {RpcClient rpcClient = ensureRpcClient(taskId);ConfigChangeBatchListenResponse configChangeBatchListenResponse = (ConfigChangeBatchListenResponse) requestProxy(rpcClient, configChangeListenRequest);if (configChangeBatchListenResponse.isSuccess()) {//从服务端的返回中拿到变化的keySet<String> changeKeys = new HashSet<>();//handle changed keys,notify listenerif (!CollectionUtils.isEmpty(configChangeBatchListenResponse.getChangedConfigs())) {//如果有变化的key,标记hasChangedKeys为truehasChangedKeys = true;for (ConfigChangeBatchListenResponse.ConfigContext changeConfig: configChangeBatchListenResponse.getChangedConfigs()) {String changeKey = GroupKey.getKeyTenant(changeConfig.getDataId(),changeConfig.getGroup(), changeConfig.getTenant());//变化的key记录下changeKeys.add(changeKey);//调用refreshContentAndCheck,走后续流程refreshContentAndCheck(changeKey);}}//handler content configsfor (CacheData cacheData : listenCaches) {String groupKey = GroupKey.getKeyTenant(cacheData.dataId, cacheData.group,cacheData.getTenant());if (!changeKeys.contains(groupKey)) {//找到那些没有变化的key,并且有listener的,看上次更新时间是不是没有变化来设置syncWithServer为true//sync:cache data md5 = server md5 && cache data md5 = all listeners md5.synchronized (cacheData) {if (!cacheData.getListeners().isEmpty()) {Long previousTimesStamp = timestampMap.get(groupKey);if (previousTimesStamp != null && !cacheData.getLastModifiedTs().compareAndSet(previousTimesStamp, System.currentTimeMillis())) {continue;}cacheData.setSyncWithServer(true);}}}cacheData.setInitializing(false);}}} catch (Exception e) {LOGGER.error("Async listen config change error ", e);try {Thread.sleep(50L);} catch (InterruptedException interruptedException) {//ignore}}}}if (!removeListenCachesMap.isEmpty()) {//需要删除的for (Map.Entry<String, List<CacheData>> entry : removeListenCachesMap.entrySet()) {String taskId = entry.getKey();List<CacheData> removeListenCaches = entry.getValue();ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(removeListenCaches);configChangeListenRequest.setListen(false);try {RpcClient rpcClient = ensureRpcClient(taskId);boolean removeSuccess = unListenConfigChange(rpcClient, configChangeListenRequest);if (removeSuccess) {for (CacheData cacheData : removeListenCaches) {synchronized (cacheData) {if (cacheData.isDiscard()) {ClientWorker.this.removeCache(cacheData.dataId, cacheData.group,cacheData.tenant);}}}}} catch (Exception e) {LOGGER.error("async remove listen config change error ", e);}try {Thread.sleep(50L);} catch (InterruptedException interruptedException) {//ignore}}}if (needAllSync) {lastAllSyncTime = now;}//If has changed keys,notify re sync md5.if (hasChangedKeys) {notifyListenConfig();}
}

正常情况下,executeConfigListen() 这个方法是每隔5秒会进一次,方法可以分为以下几个步骤:

  1. 创建 listenCachesMapremoveListenCachesMap 分别用于存放非销毁的缓存和销毁的 cache
  2. 比较当前时间和上次全同步的时间间隔是否大于等于5分钟,如果大于等于5分钟就需要全局同步,否则不需要
  3. 循环每个 cache ,看是否和服务端同步过了,如果没有,就需要添加到 listenCachesMap 里,走后面的检查,如果已经同步过了,就比较 md5 和上次的是否一致,如果不一致,就调用 cache.checkListenerMd5() 方法(具体这个方法干了啥,后面再聊),然后看是否需要全局同步,如果不需要,就跳过,如果需要就按照 taskId 分组,以及是否销毁来决定放置到 listenCachesMapremoveListenCachesMap 两个 map 中(同一个 taskId 最多3000个 cache,这个在放置 cacheMap 的时候处理的,可以找 ParamUtil.getPerTaskConfigSize() 代码)
  4. 如果 listenCachesMap 有内容,就会循环 listenCachesMap,构建 ConfigBatchListenRequest,根据 taskId 获取 gRpcClient,将然后发请求给服务端,获取变化的 key,然后调用 refreshContentAndCheck(changeKey) 方法,下面单独讲
  5. 同理,循环 removeListenCachesMap,也是构建 ConfigBatchListenRequest,给服务端发消息,取消监听,删除缓存
  6. 最后如果是全局同步,需要更新一下全局同步的时间
  7. 如果有变化的key会调用 notifyListenConfig() 方法,往阻塞队列里放一个元素,这样会立即触发下一次 executeConfigListen() 方法,在 cache.checkListenerMd5() 方法里去同步 md5 值,这时候,如果没有变更的 cache,他的 syncWithServer 被设置成了 true,再次进 executeConfigListen() 方法时,就会跳过检查

这里需要提一点,在与服务端建立 gRPC 连接时会添加 Handler 处理服务端推送的请求,当服务端的配置发生变化时,会推送消息,我们看下这段代码,在 ClientWorker 类的 initRpcClientHandler 方法

//ClientWorker
private void initRpcClientHandler(final RpcClient rpcClientInner) {/** Register Config Change /Config ReSync Handler*/rpcClientInner.registerServerRequestHandler((request) -> {if (request instanceof ConfigChangeNotifyRequest) {ConfigChangeNotifyRequest configChangeNotifyRequest = (ConfigChangeNotifyRequest) request;LOGGER.info("[{}] [server-push] config changed. dataId={}, group={},tenant={}",rpcClientInner.getName(), configChangeNotifyRequest.getDataId(),configChangeNotifyRequest.getGroup(), configChangeNotifyRequest.getTenant());String groupKey = GroupKey.getKeyTenant(configChangeNotifyRequest.getDataId(),configChangeNotifyRequest.getGroup(), configChangeNotifyRequest.getTenant());CacheData cacheData = cacheMap.get().get(groupKey);if (cacheData != null) {synchronized (cacheData) {cacheData.getLastModifiedTs().set(System.currentTimeMillis());cacheData.setSyncWithServer(false);notifyListenConfig();}}return new ConfigChangeNotifyResponse();}return null;});...省略
}

可以看到,在接受到服务端发送的请求后,会把 cache 的最后更新时间戳更新成当前时间戳,把 syncWithServer 更新成 false ,并且往阻塞队列里添加了一个元素,所以就会立即触发 executeConfigListen 方法执行,并且由于 syncWithServer 变成了 false ,所以会触发检查。

我们再回过头来继续看 refreshContentAndCheck 方法

//ClientWorker
private void refreshContentAndCheck(CacheData cacheData, boolean notify) {try {ConfigResponse response = getServerConfig(cacheData.dataId, cacheData.group, cacheData.tenant, 3000L,notify);cacheData.setEncryptedDataKey(response.getEncryptedDataKey());cacheData.setContent(response.getContent());if (null != response.getConfigType()) {cacheData.setType(response.getConfigType());}if (notify) {LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}",agent.getName(), cacheData.dataId, cacheData.group, cacheData.tenant, cacheData.getMd5(),ContentUtils.truncateContent(response.getContent()), response.getConfigType());}cacheData.checkListenerMd5();} catch (Exception e) {LOGGER.error("refresh content and check md5 fail ,dataId={},group={},tenant={} ", cacheData.dataId,cacheData.group, cacheData.tenant, e);}
}

getServerConfig 方法,内部就是发起请求给服务端,获取最新的配置内容,也就是 response.getContent(),并更新到 CacheData 里,然后调用了 cacheData.checkListenerMd5() 方法,继续看

//CacheData
void checkListenerMd5() {for (ManagerListenerWrap wrap : listeners) {if (!md5.equals(wrap.lastCallMd5)) {safeNotifyListener(dataId, group, content, type, md5, encryptedDataKey, wrap);}}
}

这里用 cache 里的 md5 值和每个 listener 的最后一次 md5 比较,ManagerListenerWrap 是对 Listener 做了一层封装,如果 md5 值不一致,就调用 safeNotifyListener 方法

//CacheData
private void safeNotifyListener(final String dataId, final String group, final String content, final String type,final String md5, final String encryptedDataKey, final ManagerListenerWrap listenerWrap) {final Listener listener = listenerWrap.listener;if (listenerWrap.inNotifying) {LOGGER.warn("[{}] [notify-currentSkip] dataId={}, group={}, md5={}, listener={}, listener is not finish yet,will try next time.",name, dataId, group, md5, listener);return;}Runnable job = () -> {long start = System.currentTimeMillis();ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();ClassLoader appClassLoader = listener.getClass().getClassLoader();try {if (listener instanceof AbstractSharedListener) {AbstractSharedListener adapter = (AbstractSharedListener) listener;adapter.fillContext(dataId, group);LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);}// Before executing the callback, set the thread classloader to the classloader of// the specific webapp to avoid exceptions or misuses when calling the spi interface in// the callback method (this problem occurs only in multi-application deployment).Thread.currentThread().setContextClassLoader(appClassLoader);ConfigResponse cr = new ConfigResponse();cr.setDataId(dataId);cr.setGroup(group);cr.setContent(content);cr.setEncryptedDataKey(encryptedDataKey);configFilterChainManager.doFilter(null, cr);String contentTmp = cr.getContent();listenerWrap.inNotifying = true;listener.receiveConfigInfo(contentTmp);// compare lastContent and contentif (listener instanceof AbstractConfigChangeListener) {Map<String, ConfigChangeItem> data = ConfigChangeHandler.getInstance().parseChangeData(listenerWrap.lastContent, contentTmp, type);ConfigChangeEvent event = new ConfigChangeEvent(data);((AbstractConfigChangeListener) listener).receiveConfigChange(event);listenerWrap.lastContent = contentTmp;}listenerWrap.lastCallMd5 = md5;LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ,cost={} millis.", name, dataId,group, md5, listener, (System.currentTimeMillis() - start));} catch (NacosException ex) {LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}", name,dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg());} catch (Throwable t) {LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={}", name, dataId, group, md5,listener, t);} finally {listenerWrap.inNotifying = false;Thread.currentThread().setContextClassLoader(myClassLoader);}};final long startNotify = System.currentTimeMillis();try {if (null != listener.getExecutor()) {listener.getExecutor().execute(job);} else {try {INTERNAL_NOTIFIER.submit(job);} catch (RejectedExecutionException rejectedExecutionException) {LOGGER.warn("[{}] [notify-blocked] dataId={}, group={}, md5={}, listener={}, no available internal notifier,will sync notifier ",name, dataId, group, md5, listener);job.run();} catch (Throwable throwable) {LOGGER.error("[{}] [notify-blocked] dataId={}, group={}, md5={}, listener={}, submit internal async task fail,throwable= ",name, dataId, group, md5, listener, throwable);job.run();}}} catch (Throwable t) {LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId,group, md5, listener, t.getCause());}final long finishNotify = System.currentTimeMillis();LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ",name, (finishNotify - startNotify), dataId, group, md5, listener);
}

最核心的是 listener.receiveConfigInfo(contentTmp) 这个方法,跟进去,到实现类 AbstractSharedListener

//AbstractSharedListener
@Override
public final void receiveConfigInfo(String configInfo) {innerReceive(dataId, group, configInfo);
}

继续跟进 innerReceive 方法,来到 NacosContextRefresherregisterNacosListener 方法里的内部类的 innerReceive 方法里

//NacosContextRefresher
private void registerNacosListener(final String groupKey, final String dataKey) {String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);Listener listener = listenerMap.computeIfAbsent(key,lst -> new AbstractSharedListener() {@Overridepublic void innerReceive(String dataId, String group,String configInfo) {refreshCountIncrement();nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);applicationContext.publishEvent(new RefreshEvent(this, null, "Refresh Nacos config"));if (log.isDebugEnabled()) {log.debug(String.format("Refresh Nacos config group=%s,dataId=%s,configInfo=%s",group, dataId, configInfo));}}});try {configService.addListener(dataKey, groupKey, listener);log.info("[Nacos Config] Listening config: dataId={}, group={}", dataKey,groupKey);}catch (NacosException e) {log.warn(String.format("register fail for nacos listener ,dataId=[%s],group=[%s]", dataKey,groupKey), e);}
}

可以看到方法内部,发布了一个 RefreshEvent 事件,监听这个事件的监听器是 RefreshEventListener(这里要先学习下Spring的事件发布),所以继续跟进到 RefreshEventListeneronApplicationEvent 方法

//RefreshEventListener
@Override
public void onApplicationEvent(ApplicationEvent event) {if (event instanceof ApplicationReadyEvent) {handle((ApplicationReadyEvent) event);}else if (event instanceof RefreshEvent) {handle((RefreshEvent) event);}
}

因为是 RefreshEvent,所以走下面的 handle 方法

//RefreshEventListener
public void handle(RefreshEvent event) {if (this.ready.get()) { // don't handle events before app is readylog.debug("Event received " + event.getEventDesc());Set<String> keys = this.refresh.refresh();log.info("Refresh keys changed: " + keys);}
}

继续跟进 refresh.refresh() 方法

//ContextRefresher
public synchronized Set<String> refresh() {Set<String> keys = refreshEnvironment();this.scope.refreshAll();return keys;
}

这里干了两件事,一个 refreshEnvironment(),一个 scope.refreshAll()

其实这两个方法里分别有@ConfigurationProperties 注解和 @RefreshScope 注解的处理

2、@ConfigurationProperties注解的bean是如何自动刷新的

我们先看 refreshEnvironment()

//ContextRefresher
public synchronized Set<String> refreshEnvironment() {Map<String, Object> before = extract(this.context.getEnvironment().getPropertySources());updateEnvironment();Set<String> keys = changes(before, extract(this.context.getEnvironment().getPropertySources())).keySet();this.context.publishEvent(new EnvironmentChangeEvent(this.context, keys));return keys;
}

这个方法,先获取更新前的参数放在 map 里,然后更新环境,然后再获取一遍,和之前的对比,就可以得到变化的 key,到这里没什么好说的,常规的刷新环境,然后又发布了一个 EnvironmentChangeEvent 环境变更事件,这里是Spring留的一个扩展,当你需要在环境刷新时做一些事,就可以写个监听器监听 EnvironmentChangeEvent 事件,这里 Spring 默认写了个监听器, ConfigurationPropertiesRebinder ,我们看下它的 onApplicationEvent 方法

//ConfigurationPropertiesRebinder
@Override
public void onApplicationEvent(EnvironmentChangeEvent event) {if (this.applicationContext.equals(event.getSource())// Backwards compatible|| event.getKeys().equals(event.getSource())) {rebind();}
}

继续跟进 rebind() 方法

//ConfigurationPropertiesRebinder
@ManagedOperation
public void rebind() {this.errors.clear();for (String name : this.beans.getBeanNames()) {rebind(name);}
}

这里循环了每个 bean,这个beans具体是什么,下面再看,我们先继续跟进 rebind() 方法

//ConfigurationPropertiesRebinder
@ManagedOperation
public boolean rebind(String name) {if (!this.beans.getBeanNames().contains(name)) {return false;}ApplicationContext appContext = this.applicationContext;while (appContext != null) {if (appContext.containsLocalBean(name)) {return rebind(name, appContext);}else {appContext = appContext.getParent();}}return false;
}

继续跟进 rebind(name, appContext) 方法

private boolean rebind(String name, ApplicationContext appContext) {try {Object bean = appContext.getBean(name);if (AopUtils.isAopProxy(bean)) {bean = ProxyUtils.getTargetObject(bean);}if (bean != null) {// TODO: determine a more general approach to fix this.// see// https://github.com/spring-cloud/spring-cloud-commons/issues/571if (getNeverRefreshable().contains(bean.getClass().getName())) {return false; // ignore}appContext.getAutowireCapableBeanFactory().destroyBean(bean);appContext.getAutowireCapableBeanFactory().initializeBean(bean, name);return true;}}catch (RuntimeException e) {this.errors.put(name, e);throw e;}catch (Exception e) {this.errors.put(name, e);throw new IllegalStateException("Cannot rebind to " + name, e);}return false;
}

这里已经很清楚了,把bean销毁,再重新初始化,这里有个 getNeverRefreshable() 方法可以配置从不刷新的 key 的,默认是 com.zaxxer.hikari.HikariDataSource 这个值从不刷新,可以通过 spring.cloud.refresh.never-refreshable 参数配置不刷新的 key,这里好像每次都会把所有 beans 里的 bean 都销毁重新初始化,好像是有什么bug,也在这里标注了一个 TODO,不知道后续有没有解决,下面我们看看这个 beans 到底是什么,这个 beans 是对象 ConfigurationPropertiesBeans,它的 getBeanNames() 方法,返回的其实是其内部的一个 map 的 keySet

//ConfigurationPropertiesBeans
private Map<String, ConfigurationPropertiesBean> beans = new HashMap<>();public Set<String> getBeanNames() {return new HashSet<>(this.beans.keySet());
}

我们看看这个 map 里的 ConfigurationPropertiesBean 是怎么放进去,其实 ConfigurationPropertiesBeans 这个类实现了 BeanPostProcessor 接口,熟悉 Spring 源码的已经知道了,在 bean 初始化之前,会调用这个类的 postProcessBeforeInitialization 方法,我们看下

//ConfigurationPropertiesBeans
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {if (isRefreshScoped(beanName)) {return bean;}ConfigurationPropertiesBean propertiesBean = ConfigurationPropertiesBean.get(this.applicationContext, bean,beanName);if (propertiesBean != null) {this.beans.put(beanName, propertiesBean);}return bean;
}

可以看到通过 ConfigurationPropertiesBean.get(this.applicationContext, bean, beanName) 方法获取到 bean 就会放到 map 中,跟进

//ConfigurationPropertiesBean
/*** Return a {@link ConfigurationPropertiesBean @ConfigurationPropertiesBean} instance* for the given bean details or {@code null} if the bean is not a* {@link ConfigurationProperties @ConfigurationProperties} object. Annotations are* considered both on the bean itself, as well as any factory method (for example a* {@link Bean @Bean} method).* @param applicationContext the source application context* @param bean the bean to consider* @param beanName the bean name* @return a configuration properties bean or {@code null} if the neither the bean nor* factory method are annotated with* {@link ConfigurationProperties @ConfigurationProperties}*/
public static ConfigurationPropertiesBean get(ApplicationContext applicationContext, Object bean, String beanName) {Method factoryMethod = findFactoryMethod(applicationContext, beanName);Bindable<Object> bindTarget = createBindTarget(bean, bean.getClass(), factoryMethod);if (bindTarget == null) {return null;}bindTarget = bindTarget.withBindMethod(BindMethodAttribute.get(applicationContext, beanName));if (bindTarget.getBindMethod() == null && factoryMethod != null) {bindTarget = bindTarget.withBindMethod(JAVA_BEAN_BIND_METHOD);}if (bindTarget.getBindMethod() != VALUE_OBJECT_BIND_METHOD) {bindTarget = bindTarget.withExistingValue(bean);}return create(beanName, bean, bindTarget);
}

其实通过方法上的注释,已经能看出来了,没关系,我们继续跟进 createBindTarget 方法

//ConfigurationPropertiesBean
private static Bindable<Object> createBindTarget(Object bean, Class<?> beanType, Method factoryMethod) {ResolvableType type = (factoryMethod != null) ? ResolvableType.forMethodReturnType(factoryMethod): ResolvableType.forClass(beanType);Annotation[] annotations = findAnnotations(bean, beanType, factoryMethod);return (annotations != null) ? Bindable.of(type).withAnnotations(annotations) : null;
}

继续跟进 findAnnotations 方法

//ConfigurationPropertiesBean
private static Annotation[] findAnnotations(Object instance, Class<?> type, Method factory) {ConfigurationProperties annotation = findAnnotation(instance, type, factory, ConfigurationProperties.class);if (annotation == null) {return null;}Validated validated = findAnnotation(instance, type, factory, Validated.class);return (validated != null) ? new Annotation[] { annotation, validated } : new Annotation[] { annotation };
}

这里就显而易见了,所有加了注解 @ConfigurationProperties 的bean

3、@RefreshScope 注解的bean是如何自动刷新的

在看 scope.refreshAll() 之前,我们先了解下 @RefreshScope 注解的 bean 是如何在 Spring 中实例化初始化的,首先看 @RefreshScope 注解

@Target({ ElementType.TYPE, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Scope("refresh")
@Documented
public @interface RefreshScope {/*** Alias for {@link Scope#proxyMode}.* @see Scope#proxyMode()* @return proxy mode*/@AliasFor(annotation = Scope.class)ScopedProxyMode proxyMode() default ScopedProxyMode.TARGET_CLASS;}

它是个复合注解,里面标记了 @Scope(“refresh”) ,并且 ScopedProxyMode 的默认值是 TARGET_CLASS,记住了,后面会用到

然后在 ClassPathBeanDefinitionScanner 扫描bean时,会为这些bean创建一个代理 beanDefinition,最终在实例化环节会多生成一个代理bean,bean的名字是在原本名字前拼接上 scopedTarget.,来看下 ClassPathBeanDefinitionScannerdoScan 方法

image-20240207165738032

具体看 AnnotationConfigUtils.applyScopedProxyMode 方法

//AnnotationConfigUtils
static BeanDefinitionHolder applyScopedProxyMode(ScopeMetadata metadata, BeanDefinitionHolder definition, BeanDefinitionRegistry registry) {ScopedProxyMode scopedProxyMode = metadata.getScopedProxyMode();if (scopedProxyMode.equals(ScopedProxyMode.NO)) {return definition;}boolean proxyTargetClass = scopedProxyMode.equals(ScopedProxyMode.TARGET_CLASS);return ScopedProxyCreator.createScopedProxy(definition, registry, proxyTargetClass);
}

由于 ScopedProxyMode 是 TARGET_CLASS,不是 NO,所以 proxyTargetClass 是 true,继续进 ScopedProxyCreator.createScopedProxy 方法

//ScopedProxyCreator
public static BeanDefinitionHolder createScopedProxy(BeanDefinitionHolder definitionHolder, BeanDefinitionRegistry registry, boolean proxyTargetClass) {return ScopedProxyUtils.createScopedProxy(definitionHolder, registry, proxyTargetClass);
}

继续跟进 ScopedProxyUtils.createScopedProxy 方法

public static BeanDefinitionHolder createScopedProxy(BeanDefinitionHolder definition,BeanDefinitionRegistry registry, boolean proxyTargetClass) {String originalBeanName = definition.getBeanName();BeanDefinition targetDefinition = definition.getBeanDefinition();//bean的名字前面拼接上了一个scopedTarget.String targetBeanName = getTargetBeanName(originalBeanName);// Create a scoped proxy definition for the original bean name,// "hiding" the target bean in an internal target definition.//代理对象的class类型是ScopedProxyFactoryBeanRootBeanDefinition proxyDefinition = new RootBeanDefinition(ScopedProxyFactoryBean.class);proxyDefinition.setDecoratedDefinition(new BeanDefinitionHolder(targetDefinition, targetBeanName));proxyDefinition.setOriginatingBeanDefinition(targetDefinition);proxyDefinition.setSource(definition.getSource());proxyDefinition.setRole(targetDefinition.getRole());proxyDefinition.getPropertyValues().add("targetBeanName", targetBeanName);if (proxyTargetClass) {targetDefinition.setAttribute(AutoProxyUtils.PRESERVE_TARGET_CLASS_ATTRIBUTE, Boolean.TRUE);// ScopedProxyFactoryBean's "proxyTargetClass" default is TRUE, so we don't need to set it explicitly here.}else {proxyDefinition.getPropertyValues().add("proxyTargetClass", Boolean.FALSE);}// Copy autowire settings from original bean definition.proxyDefinition.setAutowireCandidate(targetDefinition.isAutowireCandidate());proxyDefinition.setPrimary(targetDefinition.isPrimary());if (targetDefinition instanceof AbstractBeanDefinition abd) {proxyDefinition.copyQualifiersFrom(abd);}// The target bean should be ignored in favor of the scoped proxy.//实际的目标对象这两个属性设置falsetargetDefinition.setAutowireCandidate(false);targetDefinition.setPrimary(false);// Register the target bean as separate bean in the factory.registry.registerBeanDefinition(targetBeanName, targetDefinition);// Return the scoped proxy definition as primary bean definition// (potentially an inner bean).//用原本的bean名称去创建一个代理的beanDefinitionreturn new BeanDefinitionHolder(proxyDefinition, originalBeanName, definition.getAliases());
}

这个方法就是具体的创建代理的 beanDefinition,可以看到在 getTargetBeanName 方法里,获取了 bean 的名字,也就是在原本 bean 的前面拼接上 scopedTarget.,并且代理对象的class类型是 ScopedProxyFactoryBean

总结一下就是:

  1. 创建一个代理对象,其名称为 "scopedTarget." + originalBeanName
  2. 把代理对象的class类型设置为 ScopedProxyFactoryBean
  3. 把原本的bean定义 autowireCandidateprimary 改成false,相当于忽略了原本的bean,而使用新的代理bean
  4. 用原本的bean名称去创建一个代理的 beanDefinition,达到偷梁换柱的效果

当 Spring 创建 bean 对象时,来到核心代码 AbstractBeanFactorydoGetBean 方法

/*** Return an instance, which may be shared or independent, of the specified bean.* @param name the name of the bean to retrieve* @param requiredType the required type of the bean to retrieve* @param args arguments to use when creating a bean instance using explicit arguments* (only applied when creating a new instance as opposed to retrieving an existing one)* @param typeCheckOnly whether the instance is obtained for a type check,* not for actual use* @return an instance of the bean* @throws BeansException if the bean could not be created*/
@SuppressWarnings("unchecked")
protected <T> T doGetBean(String name, @Nullable Class<T> requiredType, @Nullable Object[] args, boolean typeCheckOnly)throws BeansException {String beanName = transformedBeanName(name);Object beanInstance;// Eagerly check singleton cache for manually registered singletons.Object sharedInstance = getSingleton(beanName);if (sharedInstance != null && args == null) {if (logger.isTraceEnabled()) {if (isSingletonCurrentlyInCreation(beanName)) {logger.trace("Returning eagerly cached instance of singleton bean '" + beanName +"' that is not fully initialized yet - a consequence of a circular reference");}else {logger.trace("Returning cached instance of singleton bean '" + beanName + "'");}}beanInstance = getObjectForBeanInstance(sharedInstance, name, beanName, null);}else {// Fail if we're already creating this bean instance:// We're assumably within a circular reference.if (isPrototypeCurrentlyInCreation(beanName)) {throw new BeanCurrentlyInCreationException(beanName);}// Check if bean definition exists in this factory.BeanFactory parentBeanFactory = getParentBeanFactory();if (parentBeanFactory != null && !containsBeanDefinition(beanName)) {// Not found -> check parent.String nameToLookup = originalBeanName(name);if (parentBeanFactory instanceof AbstractBeanFactory abf) {return abf.doGetBean(nameToLookup, requiredType, args, typeCheckOnly);}else if (args != null) {// Delegation to parent with explicit args.return (T) parentBeanFactory.getBean(nameToLookup, args);}else if (requiredType != null) {// No args -> delegate to standard getBean method.return parentBeanFactory.getBean(nameToLookup, requiredType);}else {return (T) parentBeanFactory.getBean(nameToLookup);}}if (!typeCheckOnly) {markBeanAsCreated(beanName);}StartupStep beanCreation = this.applicationStartup.start("spring.beans.instantiate").tag("beanName", name);try {if (requiredType != null) {beanCreation.tag("beanType", requiredType::toString);}RootBeanDefinition mbd = getMergedLocalBeanDefinition(beanName);checkMergedBeanDefinition(mbd, beanName, args);// Guarantee initialization of beans that the current bean depends on.String[] dependsOn = mbd.getDependsOn();if (dependsOn != null) {for (String dep : dependsOn) {if (isDependent(beanName, dep)) {throw new BeanCreationException(mbd.getResourceDescription(), beanName,"Circular depends-on relationship between '" + beanName + "' and '" + dep + "'");}registerDependentBean(dep, beanName);try {getBean(dep);}catch (NoSuchBeanDefinitionException ex) {throw new BeanCreationException(mbd.getResourceDescription(), beanName,"'" + beanName + "' depends on missing bean '" + dep + "'", ex);}}}// Create bean instance.if (mbd.isSingleton()) {sharedInstance = getSingleton(beanName, () -> {try {return createBean(beanName, mbd, args);}catch (BeansException ex) {// Explicitly remove instance from singleton cache: It might have been put there// eagerly by the creation process, to allow for circular reference resolution.// Also remove any beans that received a temporary reference to the bean.destroySingleton(beanName);throw ex;}});beanInstance = getObjectForBeanInstance(sharedInstance, name, beanName, mbd);}else if (mbd.isPrototype()) {// It's a prototype -> create a new instance.Object prototypeInstance = null;try {beforePrototypeCreation(beanName);prototypeInstance = createBean(beanName, mbd, args);}finally {afterPrototypeCreation(beanName);}beanInstance = getObjectForBeanInstance(prototypeInstance, name, beanName, mbd);}else {String scopeName = mbd.getScope();if (!StringUtils.hasLength(scopeName)) {throw new IllegalStateException("No scope name defined for bean '" + beanName + "'");}Scope scope = this.scopes.get(scopeName);if (scope == null) {throw new IllegalStateException("No Scope registered for scope name '" + scopeName + "'");}try {Object scopedInstance = scope.get(beanName, () -> {beforePrototypeCreation(beanName);try {return createBean(beanName, mbd, args);}finally {afterPrototypeCreation(beanName);}});beanInstance = getObjectForBeanInstance(scopedInstance, name, beanName, mbd);}catch (IllegalStateException ex) {throw new ScopeNotActiveException(beanName, scopeName, ex);}}}catch (BeansException ex) {beanCreation.tag("exception", ex.getClass().toString());beanCreation.tag("message", String.valueOf(ex.getMessage()));cleanupAfterBeanCreationFailure(beanName);throw ex;}finally {beanCreation.end();}}return adaptBeanInstance(name, beanInstance, requiredType);
}

不同的作用域有不同的创建方式,我们看看,scope 是 refresh 的

String scopeName = mbd.getScope();
if (!StringUtils.hasLength(scopeName)) {throw new IllegalStateException("No scope name defined for bean '" + beanName + "'");
}
Scope scope = this.scopes.get(scopeName);
if (scope == null) {throw new IllegalStateException("No Scope registered for scope name '" + scopeName + "'");
}
try {Object scopedInstance = scope.get(beanName, () -> {beforePrototypeCreation(beanName);try {return createBean(beanName, mbd, args);}finally {afterPrototypeCreation(beanName);}});beanInstance = getObjectForBeanInstance(scopedInstance, name, beanName, mbd);
}
catch (IllegalStateException ex) {throw new ScopeNotActiveException(beanName, scopeName, ex);
}

跟进 scope.get 方法,来到 GenericScope 类里

@Override
public Object get(String name, ObjectFactory<?> objectFactory) {BeanLifecycleWrapper value = this.cache.put(name, new BeanLifecycleWrapper(name, objectFactory));this.locks.putIfAbsent(name, new ReentrantReadWriteLock());try {return value.getBean();}catch (RuntimeException e) {this.errors.put(name, e);throw e;}
}

这个 cache.put 方法往里点,最终来到 StandardScopeCache 的 put 方法

@Override
public Object put(String name, Object value) {Object result = this.cache.putIfAbsent(name, value);if (result != null) {return result;}return value;
}

相当于缓存里有,就返回,没有就返回一个新的 BeanLifecycleWrapper 对象,然后调用 BeanLifecycleWrappergetBean 方法,最后会调用 createBean 创建对象,简单来说就是,做了个缓存机制,如果不刷新,一直取的是缓存,如果刷新了,就会重新创新 bean 对象

这时候我们再回过头看看 scope.refreshAll() 方法,其实就是清除缓存,来一起看看这个方法

//RefreshScope
@ManagedOperation(description = "Dispose of the current instance of all beans "+ "in this scope and force a refresh on next method execution.")
public void refreshAll() {super.destroy();this.context.publishEvent(new RefreshScopeRefreshedEvent());
}

下面那个 this.context.publishEvent(new RefreshScopeRefreshedEvent()); 没有监听器,应该是留个口子,后续扩展用的,先不管,我们重点看父类 GenericScopedestroy 方法

//GenericScope
@Override
public void destroy() {List<Throwable> errors = new ArrayList<>();Collection<BeanLifecycleWrapper> wrappers = this.cache.clear();for (BeanLifecycleWrapper wrapper : wrappers) {try {Lock lock = this.locks.get(wrapper.getName()).writeLock();lock.lock();try {wrapper.destroy();}finally {lock.unlock();}}catch (RuntimeException e) {errors.add(e);}}if (!errors.isEmpty()) {throw wrapIfNecessary(errors.get(0));}this.errors.clear();
}

可以看到调用了 cache.clear 方法清除缓存

三、总结

梳理下完整流程:

  1. 客户端创建 RpcClient 与服务端建立连接时会添加 Handler 处理服务端推送的请求,并且启动一个线程不断去监听配置变化
  2. 当服务端某配置文件里的配置发生变化时,发送通知给到客户端,告知其某配置文件变化
  3. 客户端收到变更通知,立即把该配置文件内容发送到服务端去对比,服务端返回变化的配置文件
  4. 然后客户端会再次请求服务端,获取最新的配置内容,覆盖客户端的
  5. 最后销毁有 @ConfigurationProperties 注解的 bean,重新初始化
  6. 删除 @RefreshScope 注解的 bean 缓存,获取bean的时候会重新创建

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/2776029.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

使用CURL命令实现tftp和ftp客户端功能

要使用curl命令实现FTP文件发送&#xff0c;您需要使用以下命令格式&#xff1a; curl -T <local_file_path> -u <username>:<password> ftp://<ftp_server_address>/<remote_file_path> 其中: <local_file_path> 是本地文件的路径&…

下载已编译的 OpenCV 包在 Visual Studio 下实现快速配置

自己编译 OpenCV 挺麻烦的&#xff0c;配置需要耗费很长时间&#xff0c;编译也需要很长时间&#xff0c;而且无法保证能全部编译通过。利用 OpenCV 官网提供的已编译的 OpenCV 库可以节省很多时间。下面介绍安装配置方法。 1. OpenCV 官网 地址是&#xff1a;https://opencv…

【Redis】深入理解 Redis 常用数据类型源码及底层实现(3.详解String数据结构)

【Redis】深入理解 Redis 常用数据类型源码及底层实现&#xff08;1.结构与源码概述&#xff09;-CSDN博客 【Redis】深入理解 Redis 常用数据类型源码及底层实现(2.版本区别dictEntry & redisObject详解)-CSDN博客 紧接着前两篇的总体介绍&#xff0c;从这篇开始&#x…

yo!这里是Linux线程保姆级入门介绍

目录 前言 Linux线程基础 线程概念 底层示意图 线程vs进程 Linux线程控制 创建线程 线程ID 线程终止 线程等待 线程分离 Linux线程互斥 背景概念 互斥量mutex 1.相关接口 2.实现原理 可重入vs线程安全 死锁 Linux线程同步 条件变量 生产者消费者模型 基于…

排序算法---堆排序

原创不易&#xff0c;转载请注明出处。欢迎点赞收藏~ 堆排序&#xff08;Heap Sort&#xff09;是一种基于二叉堆数据结构的排序算法。它将待排序的元素构建成一个最大堆&#xff08;或最小堆&#xff09;&#xff0c;然后逐步将堆顶元素与堆的最后一个元素交换位置&#xff0c…

Cilium CNI深度指南

Cilium是基于eBPF的功能强大的CNI插件&#xff0c;为云原生环境提供了强大的网络和安全支持。原文: Cilium CNI: A Comprehensive Deep Dive Guide for Networking and Security Enthusiasts! &#x1f313;简介 欢迎阅读为网络和安全爱好者提供的全面深入的指南&#xff01; 本…

深度分析一款新型Linux勒索病毒

前言 DarkRadiation勒索病毒是一款全新的Linux平台下的勒索病毒&#xff0c;2021年5月29日首次在某平台上发布了此勒索病毒的相关的信息&#xff0c;6月中旬趋势科技针对这个新型的勒索病毒进行了相关的分析和报道。 DarkRadiation勒索病毒采用Bash脚本语言编写实现&#xff0…

恒流源方案对比

1、双运放恒流源 2、运放三极管放大电路组成的恒流源 5A 3、运放三极管组成的恒流源 200uA 4、运放MOS管组成的恒流源 100mA 5、电源模块并联输出100A恒流

【前沿技术杂谈:多模态文档基础模型】使用多模态文档基础模型彻底改变文档 AI

【前沿技术杂谈&#xff1a;多模态文档基础模型】使用多模态文档基础模型彻底改变文档 AI 从文本到多模态模型&#xff1a;文档 AI 逐渐发展新技能。行业领先的型号Document AI 的下一步&#xff1a;开发通用和统一框架 您是否曾经被包含不同信息&#xff08;如应付账款、日期、…

通过nginx学习linux进程名的修改

目录 1. 缘起2. 背景知识3. 源码分析3.1 准备工作3.2 设置进程名字 1. 缘起 在运行nginx的时候&#xff0c;用ps查看nginx的进程信息&#xff0c;可能的输出如下&#xff1a; root 42169 3105 0 16:51 ? 00:00:00 nginx: master process ./objs/nginx root …

Java图形化界面编程—— 基本组件和对话框 笔记

2.5 AWT中常用组件 2.5.1 基本组件 组件名功能ButtonButtonCanvas用于绘图的画布Checkbox复选框组件&#xff08;也可当做单选框组件使用&#xff09;CheckboxGroup选项组&#xff0c;用于将多个Checkbox 组件组合成一组&#xff0c; 一组 Checkbox 组件将只有一个可以 被选中…

供应链|Managemeng Science 论文解读:数据驱动下联合定价和库存控制的近似方法 (一)

编者按 本次解读的文章发表于 Management Science&#xff0c;原文信息&#xff1a;Hanzhang Qin, David Simchi-Levi, Li Wang (2022) Data-Driven Approximation Schemes for Joint Pricing and Inventory Control Models. https://doi.org/10.1287/mnsc.2021.4212 文章在数…

代码随想录算法训练营第四十六天(动态规划篇)|01背包(滚动数组方法)

01背包&#xff08;滚动数组方法&#xff09; 学习资料&#xff1a;代码随想录 (programmercarl.com) 题目链接&#xff08;和上次一样&#xff09;&#xff1a;题目页面 (kamacoder.com) 思路 使用一维滚动数组代替二维数组。二维数组的解法记录在&#xff1a;代码随想录算…

最新的 Ivanti SSRF 零日漏洞正在被大规模利用

Bleeping Computer 网站消息&#xff0c;安全研究员发现 Ivanti Connect Secure 和 Ivanti Policy Secure 服务器端请求伪造 (SSRF) 漏洞&#xff08;CVE-2024-21893 &#xff09;正在被多个威胁攻击者大规模利用。 2024 年 1 月 31 日&#xff0c;Ivanti 首次就网关 SAML 组件…

【工作学习 day04】 9. uniapp 页面和组件的生命周期

问题描述 uniapp常用的有&#xff1a;页面和组件&#xff0c;并且页面和组件各自有各自的生命周期函数&#xff0c;那么在页面/组件请求数据时&#xff0c;是用created呢&#xff0c;还是用onLoad呢&#xff1f; 先说结论: 组件使用组件的生命周期&#xff0c;页面使用页面的…

机器学习11-前馈神经网络识别手写数字1.0

在这个示例中&#xff0c;使用的神经网络是一个简单的全连接前馈神经网络&#xff0c;也称为多层感知器&#xff08;Multilayer Perceptron&#xff0c;MLP&#xff09;。这个神经网络由几个关键组件构成&#xff1a; 1. 输入层 输入层接收输入数据&#xff0c;这里是一个 28x…

双侧条形图绘制教程

写在前面 双侧条形图在我们的文章中也是比较常见的&#xff0c;那么这样的图形是如何绘制的呢&#xff1f; 以及它使用的数据类型是什么呢&#xff1f; 这些都是我们在绘制图形前需要掌握的&#xff0c;至少我们知道绘图的数据集如何准备&#xff0c;这样才踏出第一步。 今天…

基于Linux操作系统的Docker容器安装MySQL随笔

1、在Linux上安装Docker容器 cd /etc/yum.repos.d/ curl -O https://download.docker.com/linux/centos/docker-ce.repo sed -i s/$releasever/8/g docker-ce.repo yum install -y docker-ce 2、修改Docker默认镜像仓库&#xff0c;然后启动Docker容器 sudo mkdir -p /etc/do…

Javaweb之SpringBootWeb案例之异常处理功能的详细解析

3. 异常处理 3.1 当前问题 登录功能和登录校验功能我们都实现了&#xff0c;下面我们学习下今天最后一块技术点&#xff1a;异常处理。首先我们先来看一下系统出现异常之后会发生什么现象&#xff0c;再来介绍异常处理的方案。 我们打开浏览器&#xff0c;访问系统中的新增部…

独家完整版!SpringBoot动态定时任务来了!

执行定时任务的线程池配置类 import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTas…