文章目录
- 前言
- 一、 前奏:
- 二、客户端连接的建立:
- 2.1 NacosNamingService 创建:
- 2.2 NacosNamingService 初始化:
- 2.3 NamingClientProxyDelegate 长连接建立:
- 2.3.1 grpc 代理对象创建:
- 2.3.2 NamingGrpcClientProxy grpc:
- 2.3.2.1 createClient 客户端的创建:
- 2.3.2.2 start 长连接建立:
- 三、客户端实例的注册:
- 3.1 NamingGrpcClientProxy# registerService:
- 3.2 客户端发送注册请求:
- 总结
前言
本文对Nacos 客户端启动时,同服务端建立长连接的过程进行介绍。环境:客户端版本2.2.1,服务端版本 3.0.13;
一、 前奏:
实际客户端同服务端进行grpc 通道的建立,是在客户端实例注册过程中进行的,因为注册肯定要向服务端发送请求,所以要先通过grpc 完成通道的建立 ;一下对客户端实例的注册流程进行简单介绍。
流程图:
流程步骤解释:
- 客户端所在web 应用启动完成,发布 WebServiceInitializedEvent 事件;
- AbstractAutoServiceRegistration ,onApplicationEvent 方法接收事件 并调用start 方法;
public void onApplicationEvent(WebServerInitializedEvent event) {ApplicationContext context = event.getApplicationContext();if (!(context instanceof ConfigurableWebServerApplicationContext) || !"management".equals(((ConfigurableWebServerApplicationContext)context).getServerNamespace())) {// 本机web 端口 this.port.compareAndSet(0, event.getWebServer().getPort());// 初始化方法调用this.start();}
}
start() 方法:
public void start() {if (!this.isEnabled()) {if (logger.isDebugEnabled()) {logger.debug("Discovery Lifecycle disabled. Not starting");}} else {if (!this.running.get()) {// 发布 实例注册 前事件this.context.publishEvent(new InstancePreRegisteredEvent(this, this.getRegistration()));this.registrationLifecycles.forEach((registrationLifecycle) -> {registrationLifecycle.postProcessBeforeStartRegister(this.getRegistration());});// 实例注册方法调用this.register();this.registrationLifecycles.forEach((registrationLifecycle) -> {registrationLifecycle.postProcessAfterStartRegister(this.getRegistration());});if (this.shouldRegisterManagement()) {this.registrationManagementLifecycles.forEach((registrationManagementLifecycle) -> {registrationManagementLifecycle.postProcessBeforeStartRegisterManagement(this.getManagementRegistration());});this.registerManagement();this.registrationManagementLifecycles.forEach((registrationManagementLifecycle) -> {registrationManagementLifecycle.postProcessAfterStartRegisterManagement(this.getManagementRegistration());});}// 实例注册完成事件this.context.publishEvent(new InstanceRegisteredEvent(this, this.getConfiguration()));this.running.compareAndSet(false, true);}}
}
- AbstractAutoServiceRegistration ,register() 方法 调用到 NacosServiceRegistry 的 register 方法;
- namingService.registerInstance 进行客户端的注册;
public void register(Registration registration) {if (StringUtils.isEmpty(registration.getServiceId())) {log.warn("No service to register for nacos client...");} else {// 客户端长连接的建立NamingService namingService = this.namingService();String serviceId = registration.getServiceId();String group = this.nacosDiscoveryProperties.getGroup();// 构建客户端实例对象Instance instance = this.getNacosInstanceFromRegistration(registration);try {// grpc 发送 客户端实例注册请求namingService.registerInstance(serviceId, group, instance);log.info("nacos registry, {} {} {}:{} register finished", new Object[]{group, serviceId, instance.getIp(), instance.getPort()});} catch (Exception var7) {if (this.nacosDiscoveryProperties.isFailFast()) {log.error("nacos registry, {} register failed...{},", new Object[]{serviceId, registration.toString(), var7});ReflectionUtils.rethrowRuntimeException(var7);} else {log.warn("Failfast is false. {} register failed...{},", new Object[]{serviceId, registration.toString(), var7});}}}
}
二、客户端连接的建立:
NamingService namingService = this.namingService();
这行代码做了很多事情,其中需要重点关注的时 客户端与服务端连接的建立,以及客户端的故障转移机制,下文先对连接的建立进行介绍;
2.1 NacosNamingService 创建:
public static NamingService createNamingService(Properties properties) throws NacosException {try {Class<?> driverImplClass = Class.forName("com.alibaba.nacos.client.naming.NacosNamingService");// 调用 NacosNamingService 的构造方法,传入配置参数Constructor constructor = driverImplClass.getConstructor(Properties.class);return (NamingService) constructor.newInstance(properties);} catch (Throwable e) {throw new NacosException(NacosException.CLIENT_INVALID_PARAM, e);}
}
2.2 NacosNamingService 初始化:
public NacosNamingService(Properties properties) throws NacosException {
init(properties);
}private void init(Properties properties) throws NacosException {PreInitUtils.asyncPreLoadCostComponent();// 自定义属性final NacosClientProperties nacosClientProperties = NacosClientProperties.PROTOTYPE.derive(properties);ValidatorUtils.checkInitParam(nacosClientProperties);// 命名空间this.namespace = InitUtils.initNamespaceForNaming(nacosClientProperties);InitUtils.initSerialization();InitUtils.initWebRootContext(nacosClientProperties);// 日志名称属性设置initLogName(nacosClientProperties);this.notifierEventScope = UUID.randomUUID().toString();// InstancesChangeNotifier extends Subscriber<InstancesChangeEvent>// 订阅者,订阅InstancesChangeEvent 实例变更事件 ,出现变更调用InstancesChangeNotifier onchange 方法this.changeNotifier = new InstancesChangeNotifier(this.notifierEventScope);// 注册事件发布器NotifyCenter.registerToPublisher(InstancesChangeEvent.class, 16384);// 注册订阅者NotifyCenter.registerSubscriber(changeNotifier);// 服务信息获取(故障转移)this.serviceInfoHolder = new ServiceInfoHolder(namespace, this.notifierEventScope, nacosClientProperties);// 客户端代理this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, nacosClientProperties,changeNotifier);
}
2.3 NamingClientProxyDelegate 长连接建立:
2.3.1 grpc 代理对象创建:
public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfoHolder,NacosClientProperties properties, InstancesChangeNotifier changeNotifier) throws NacosException {// 服务更新this.serviceInfoUpdateService = new ServiceInfoUpdateService(properties, serviceInfoHolder, this,changeNotifier);// 服务端地址管理器this.serverListManager = new ServerListManager(properties, namespace);this.serviceInfoHolder = serviceInfoHolder;this.securityProxy = new SecurityProxy(this.serverListManager.getServerList(),NamingHttpClientManager.getInstance().getNacosRestTemplate());initSecurityProxy(properties);// http 代理this.httpClientProxy = new NamingHttpClientProxy(namespace, securityProxy, serverListManager, properties);// grpc 代理this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties,serviceInfoHolder);
}
2.3.2 NamingGrpcClientProxy grpc:
public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory,NacosClientProperties properties, ServiceInfoHolder serviceInfoHolder) throws NacosException {super(securityProxy);this.namespaceId = namespaceId;this.uuid = UUID.randomUUID().toString();// 请求超时时间this.requestTimeout = Long.parseLong(properties.getProperty(CommonParams.NAMING_REQUEST_TIMEOUT, "-1"));Map<String, String> labels = new HashMap<>();// 资源是sdklabels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK);// 模式是注册labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_NAMING);labels.put(Constants.APPNAME, AppNameUtils.getAppName());// rpc 客户端的创建this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels,RpcClientTlsConfig.properties(properties.asProperties()));this.redoService = new NamingGrpcRedoService(this, properties);NAMING_LOGGER.info("Create naming rpc client for uuid->{}", uuid);start(serverListFactory, serviceInfoHolder);
}
2.3.2.1 createClient 客户端的创建:
public static RpcClient createClient(String clientName, ConnectionType connectionType, Integer threadPoolCoreSize,Integer threadPoolMaxSize, Map<String, String> labels, RpcClientTlsConfig tlsConfig) {// 不是grpc 抛出异常if (!ConnectionType.GRPC.equals(connectionType)) {throw new UnsupportedOperationException("unsupported connection type :" + connectionType.getType());}// 客户端创建 Map<String, RpcClient> CLIENT_MAPreturn CLIENT_MAP.computeIfAbsent(clientName, clientNameInner -> {LOGGER.info("[RpcClientFactory] create a new rpc client of " + clientName);return new GrpcSdkClient(clientNameInner, threadPoolCoreSize, threadPoolMaxSize, labels, tlsConfig);});
}
2.3.2.2 start 长连接建立:
private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {// 服务地址的工厂rpcClient.serverListFactory(serverListFactory);// 监听器放入rpcClient.registerConnectionListener(redoService);// 请求处理器rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));// 客户端启动rpcClient.start();NotifyCenter.registerSubscriber(this);
}
rpcClient.start():注意做了3件事 (具体的实现细节在后续文章进行介绍)
-
客户端与服务端的通道建立:
1)和nacos 服务端建立通信的channel 管道;建立双向流的grpc 通信存根;
2)发送服务检查请求,从nacos 服务端获取到连接的connectId ;
3) 发送给服务端客户端和服务端完成连接建立的请求; -
客户端与服务端的心跳监测:
1) 在while(true) 循环中,发送healthCheck() 请求,得到true 则保持心跳(继续下一次循环),false 则失去心跳;
2)如果失去心跳,则将客户端从健康状态标记为不健康状态;
3)通过reconnect 方法尝试与nacos 服务端重新建立通信连接; -
客户端与服务端的断线重连:
- 通过 connectToServer 尝试与nacos 服务端重新建立通信连接;
2)建立成功,则将原有的连接置为不可用,并关闭原有连接,释放资源;发布新的连接建立事件到 eventLinkedBlockingQueue 队列中;
3)如果建立不成功则进行增大通nacos 服务端建立连接请求的时间间隔;
- 通过 connectToServer 尝试与nacos 服务端重新建立通信连接;
/*** Start this client.*/
public final void start() throws NacosException {// cas 状态转换: 乐观锁实现boolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING);if (!success) {return;}// 创建 clientEventExecutor 线程池,池子中设置了2个线程clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> {Thread t = new Thread(r);t.setName("com.alibaba.nacos.client.remote.worker");t.setDaemon(true);return t;});// connection event consumer.// 线程池提交任务: 客户端和服务端 连接重置;当nacos 服务端重启,客户端在心跳监测clientEventExecutor.submit(() -> {while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {ConnectionEvent take;try {take = eventLinkedBlockingQueue.take();if (take.isConnected()) {notifyConnected();} else if (take.isDisConnected()) {notifyDisConnected();}} catch (Throwable e) {// Do nothing}}});clientEventExecutor.submit(() -> {while (true) {try {if (isShutdown()) {break;}// reconnectionSignal 重连接队列ReconnectContext reconnectContext = reconnectionSignal.poll(rpcClientConfig.connectionKeepAlive(), TimeUnit.MILLISECONDS);if (reconnectContext == null) {// 重连接队列是null 则表示 客户端与服务端没有发生断线重连的情况// check alive time. 超过心跳的间隔时间,则重新发送healthCheck 监控检查请求if (System.currentTimeMillis() - lastActiveTimeStamp >= rpcClientConfig.connectionKeepAlive()) {boolean isHealthy = healthCheck();if (!isHealthy) {// 如果 健康检测失败if (currentConnection == null) {continue;}LoggerUtils.printIfInfoEnabled(LOGGER,"[{}] Server healthy check fail, currentConnection = {}",rpcClientConfig.name(), currentConnection.getConnectionId());RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get();if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) {break;}// 标记客户端为 UNHEALTHYboolean statusFLowSuccess = RpcClient.this.rpcClientStatus.compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY);if (statusFLowSuccess) {// 服务端有可能发生了故障,则将服务端信息 ServerInfo 置为null reconnectContext = new ReconnectContext(null, false);} else {continue;}} else {lastActiveTimeStamp = System.currentTimeMillis();continue;}} else {continue;}}if (reconnectContext.serverInfo != null) {// 发送连接重置时,检查 nacos 服务端的ip 和端口// clear recommend server if server is not in server list.boolean serverExist = false;for (String server : getServerListFactory().getServerList()) {ServerInfo serverInfo = resolveServerInfo(server);if (serverInfo.getServerIp().equals(reconnectContext.serverInfo.getServerIp())) {serverExist = true;reconnectContext.serverInfo.serverPort = serverInfo.serverPort;break;}}if (!serverExist) {LoggerUtils.printIfInfoEnabled(LOGGER,"[{}] Recommend server is not in server list, ignore recommend server {}",rpcClientConfig.name(), reconnectContext.serverInfo.getAddress());reconnectContext.serverInfo = null;}}// 发送重新连接服务端的请求reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);} catch (Throwable throwable) {// Do nothing}}});// connect to server, try to connect to server sync retryTimes times, async starting if failed.// 客户端启动时 第一次进行同nacos 服务端的连接建立Connection connectToServer = null;rpcClientStatus.set(RpcClientStatus.STARTING);int startUpRetryTimes = rpcClientConfig.retryTimes();// 重试次数判断while (startUpRetryTimes > 0 && connectToServer == null) {try {startUpRetryTimes--;// 随机获取一个nacos 服务端的地址ServerInfo serverInfo = nextRpcServer();LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Try to connect to server on start up, server: {}",rpcClientConfig.name(), serverInfo);// 服务端的连接connectToServer = connectToServer(serverInfo);} catch (Throwable e) {LoggerUtils.printIfWarnEnabled(LOGGER,"[{}] Fail to connect to server on start up, error message = {}, start up retry times left: {}",rpcClientConfig.name(), e.getMessage(), startUpRetryTimes, e);}}if (connectToServer != null) {LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Success to connect to server [{}] on start up, connectionId = {}",rpcClientConfig.name(), connectToServer.serverInfo.getAddress(),connectToServer.getConnectionId());// 连接建立成功,则将连接成功时间放入到 eventLinkedBlockingQueue 队列中进行消费this.currentConnection = connectToServer;rpcClientStatus.set(RpcClientStatus.RUNNING);eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));} else {// 连接失败则 将失败时间放入到reconnectionSignal 队列中,消费改队列时 进入重连的逻辑switchServerAsync();}// 注册连接重置 处理器registerServerRequestHandler(new ConnectResetRequestHandler());// register client detection request.registerServerRequestHandler(request -> {if (request instanceof ClientDetectionRequest) {return new ClientDetectionResponse();}return null;});}
三、客户端实例的注册:
在完成与服务端的通信channel 建立之后,就可以通过 namingService.registerInstance(serviceId, group, instance) 进行nacos 客户端实例的注册;
3.1 NamingGrpcClientProxy# registerService:
@Override
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName,instance);// ConcurrentMap<String, InstanceRedoData> registeredInstance map 中放入实例信息 key:分组名@@服务名redoService.cacheInstanceForRedo(serviceName, groupName, instance);// 向nacos 服务端发送注册请求,然后修改 InstanceRedoData 的实例信息为注册成功状态doRegisterService(serviceName, groupName, instance);
}
public void cacheInstanceForRedo(String serviceName, String groupName, Instance instance) {
// key:分组名@@服务名String key = NamingUtils.getGroupedName(serviceName, groupName);// 客户端服务对象创建,然后放入到 registeredInstance map 缓存(注册状态是未注册)InstanceRedoData redoData = InstanceRedoData.build(serviceName, groupName, instance);synchronized (registeredInstances) {registeredInstances.put(key, redoData);}
}
3.2 客户端发送注册请求:
public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName,NamingRemoteConstants.REGISTER_INSTANCE, instance);// 发送注册请求到服务端requestToServer(request, Response.class);// 请求发送成功 ,将当前服务实例的注册状态改为已注册redoService.instanceRegistered(serviceName, groupName);
}
requestToServer(request, Response.class);
private <T extends Response> T requestToServer(AbstractNamingRequest request, Class<T> responseClass)throws NacosException {
try {request.putAllHeader(getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName()));// 通过 rpcClient 获取通道 发送 InstanceRequest 类型的 request 请求Response response =requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout);if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) {throw new NacosException(response.getErrorCode(), response.getMessage());}if (responseClass.isAssignableFrom(response.getClass())) {return (T) response;}NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'",response.getClass().getName(), responseClass.getName());} catch (NacosException e) {throw e;} catch (Exception e) {throw new NacosException(NacosException.SERVER_ERROR, "Request nacos server failed: ", e);}throw new NacosException(NacosException.SERVER_ERROR, "Server return invalid response");
}
总结
客户端在启动成功之后发布 WebServiceInitializedEvent 事件,nacos 客户端同服务端创建通信通道,发送nacos 服务端的服务检查请求,正常返回后得到通道的id,创建双向流 grpc 的通信存根,发送连接确定建立的请求后;发起客户端实例的注册请求到nacos 服务端进行注册。