kafka源码阅读-ReplicaStateMachine(副本状态机)解析

概述

Kafka源码包含多个模块,每个模块负责不同的功能。以下是一些核心模块及其功能的概述:

  1. 服务端源码 :实现Kafka Broker的核心功能,包括日志存储、控制器、协调器、元数据管理及状态机管理、延迟机制、消费者组管理、高并发网络架构模型实现等。

  2. Java客户端源码 :实现了Producer和Consumer与Broker的交互机制,以及通用组件支撑代码。

  3. Connect源码 :用来构建异构数据双向流式同步服务。

  4. Stream源码 :用来实现实时流处理相关功能。

  5. Raft源码 :实现了Raft一致性协议。

  6. Admin模块 :Kafka的管理员模块,操作和管理其topic,partition相关,包含创建,删除topic,或者拓展分区等。

  7. Api模块 :负责数据交互,客户端与服务端交互数据的编码与解码。

  8. Client模块 :包含Producer读取Kafka Broker元数据信息的类,如topic和分区,以及leader。

  9. Cluster模块 :包含Broker、Cluster、Partition、Replica等实体类。

  10. Common模块 :包含各种异常类以及错误验证。

  11. Consumer模块 :消费者处理模块,负责客户端消费者数据和逻辑处理。

  12. Controller模块 :负责中央控制器的选举,分区的Leader选举,Replica的分配或重新分配,分区和副本的扩容等。

  13. Coordinator模块 :负责管理部分consumer group和他们的offset。

  14. Javaapi模块 :提供Java语言的Producer和Consumer的API接口。

  15. Log模块 :负责Kafka文件存储,读写所有Topic消息数据。

  16. Message模块 :封装多条数据组成数据集或压缩数据集。

  17. Metrics模块 :负责内部状态监控。

  18. Network模块 :处理客户端连接,网络事件模块。

  19. Producer模块 :生产者细节实现,包括同步和异步消息发送。

  20. Security模块 :负责Kafka的安全验证和管理。

  21. Serializer模块 :序列化和反序列化消息内容。

  22. Server模块 :涉及Leader和Offset的checkpoint,动态配置,延时创建和删除Topic,Leader选举,Admin和Replica管理等。

  23. Tools模块 :包含多种工具,如导出consumer offset值,LogSegments信息,Topic的log位置信息,Zookeeper上的offset值等。

  24. Utils模块 :包含各种工具类,如Json,ZkUtils,线程池工具类,KafkaScheduler公共调度器类等。

这些模块共同构成了Kafka的整体架构,使其能够提供高吞吐量、高可用性的消息队列服务。

kafka源码分支为1.0.2

分区状态机记录着当前集群所有 Partition 的状态信息以及如何对 Partition 状态转移进行相应的处理;副本状态机则是记录着当前集群所有 Replica 的状态信息以及如何对 Replica 状态转变进行相应的处理。

kafkaController初始化时,会启动replicaStateMachine和partitionStateMachine:

    //在 KafkaController 中//有两个状态机:分区状态机和副本状态机;//一个管理器:Channel 管理器,负责管理所有的 Broker 通信;//相关缓存:Partition 信息、Topic 信息、broker id 信息等;//四种 leader 选举机制:分别是用 leader offline、broker 掉线、partition reassign、最优 leader 选举时触发;//启动副本状态机,初始化所有 Replica 的状态信息,如果 Replica 所在节点是 alive 的,那么状态更新为 OnlineReplica, 否则更新为 ReplicaDeletionIneligible;replicaStateMachine.startup()//启动分区状态机,初始化所有 Partition 的状态信息,如果 leader 所在 broker 是 alive 的,那么状态更新为 OnlinePartition,否则更新为 OfflinePartitionpartitionStateMachine.startup()

ReplicaStateMachine类相关方法:

  /*** Invoked on successful controller election. First registers a broker change listener since that triggers all* state transitions for replicas. Initializes the state of replicas for all partitions by reading from zookeeper.* Then triggers the OnlineReplica state change for all replicas.*/def startup() {// //初始化所有副本的状态信息initializeReplicaState()//将online的replica状态转变为OnlineReplicahandleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica)info("Started replica state machine with initial state -> " + replicaState.toString())}/*** Invoked on startup of the replica's state machine to set the initial state for replicas of all existing partitions* in zookeeper*///初始化所有副本的状态信息// 这里只是将 Replica 的状态信息更新副本状态机的缓存 replicaState 中,并没有真正进行状态转移的操作。private def initializeReplicaState() {for((topicPartition, assignedReplicas) <- controllerContext.partitionReplicaAssignment) {val topic = topicPartition.topicval partition = topicPartition.partitionassignedReplicas.foreach { replicaId =>val partitionAndReplica = PartitionAndReplica(topic, partition, replicaId)//如果 Replica 所在机器是 alive 的,那么将其状态设置为 OnlineReplica//replicaId即brokerIdif (controllerContext.isReplicaOnline(replicaId, topicPartition))replicaState.put(partitionAndReplica, OnlineReplica)else {// mark replicas on dead brokers as failed for topic deletion, if they belong to a topic to be deleted.// This is required during controller failover since during controller failover a broker can go down,// so the replicas on that broker should be moved to ReplicaDeletionIneligible to be on the safer side.//否则设置为 ReplicaDeletionIneligible 状态replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)}}}}/*** This API is invoked by the broker change controller callbacks and the startup API of the state machine* @param replicas     The list of replicas (brokers) that need to be transitioned to the target state* @param targetState  The state that the replicas should be moved to* The controller's allLeaders cache should have been updated before this*///用于处理 Replica 状态的变化def handleStateChanges(replicas: Set[PartitionAndReplica], targetState: ReplicaState,callbacks: Callbacks = (new CallbackBuilder).build) {if (replicas.nonEmpty) {info("Invoking state change to %s for replicas %s".format(targetState, replicas.mkString(",")))try {brokerRequestBatch.newBatch()//状态转变replicas.foreach(r => handleStateChange(r, targetState, callbacks))//向 broker 发送相应请求brokerRequestBatch.sendRequestsToBrokers(controller.epoch)} catch {case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e)}}}/*** This API exercises the replica's state machine. It ensures that every state transition happens from a legal* previous state to the target state. Valid state transitions are:* NonExistentReplica --> NewReplica* --send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the*   partition to every live broker** NewReplica -> OnlineReplica* --add the new replica to the assigned replica list if needed** OnlineReplica,OfflineReplica -> OnlineReplica* --send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the*   partition to every live broker** NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible -> OfflineReplica* --send StopReplicaRequest to the replica (w/o deletion)* --remove this replica from the isr and send LeaderAndIsr request (with new isr) to the leader replica and*   UpdateMetadata request for the partition to every live broker.** OfflineReplica -> ReplicaDeletionStarted* --send StopReplicaRequest to the replica (with deletion)** ReplicaDeletionStarted -> ReplicaDeletionSuccessful* -- mark the state of the replica in the state machine** ReplicaDeletionStarted -> ReplicaDeletionIneligible* -- mark the state of the replica in the state machine** ReplicaDeletionSuccessful -> NonExistentReplica* -- remove the replica from the in memory partition replica assignment cache* @param partitionAndReplica The replica for which the state transition is invoked* @param targetState The end state that the replica should be moved to*/def handleStateChange(partitionAndReplica: PartitionAndReplica, targetState: ReplicaState,callbacks: Callbacks) {val topic = partitionAndReplica.topicval partition = partitionAndReplica.partitionval replicaId = partitionAndReplica.replicaval topicAndPartition = TopicAndPartition(topic, partition)// Replica 不存在的话,状态初始化为 NonExistentReplicaval currState = replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica)val stateChangeLog = stateChangeLogger.withControllerEpoch(controller.epoch)try {def logStateChange(): Unit =stateChangeLog.trace(s"Changed state of replica $replicaId for partition $topicAndPartition from " +s"$currState to $targetState")val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)//校验状态转变是否符合要求assertValidTransition(partitionAndReplica, targetState)targetState match {case NewReplica => 其前置状态只能为 NonExistentReplica// start replica as a follower to the current leader for its partition//从 zk 获取 Partition 的 leaderAndIsr 信息val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)leaderIsrAndControllerEpochOpt match {case Some(leaderIsrAndControllerEpoch) =>//若是leader的replica状态不能变为NewReplicaif(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId)throw new StateChangeFailedException(s"Replica $replicaId for partition $topicAndPartition cannot " +s"be moved to NewReplica state as it is being requested to become leader")//向该 replicaId 发送 LeaderAndIsr 请求,这个方法同时也会向所有的 broker 发送 updateMeta 请求brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),topic, partition, leaderIsrAndControllerEpoch,replicaAssignment, isNew = true)//对于新建的 Partition,处于这个状态时,该 Partition 是没有相应的 LeaderAndIsr 信息的case None => // new leader request will be sent to this replica when one gets elected}//将该 Replica 的状态转移成 NewReplica,然后结束流程。replicaState.put(partitionAndReplica, NewReplica)logStateChange()case ReplicaDeletionStarted => //其前置状态只能为 OfflineReplica//更新向该 Replica 的状态为 ReplicaDeletionStarted;replicaState.put(partitionAndReplica, ReplicaDeletionStarted)// send stop replica command//发送 StopReplica 请求给该副本,并设置 deletePartition=true//broker收到这请求后,会从物理存储上删除这个 Replica 的数据内容brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true,callbacks.stopReplicaResponseCallback)logStateChange()case ReplicaDeletionIneligible => //其前置状态只能为 ReplicaDeletionStartedreplicaState.put(partitionAndReplica, ReplicaDeletionIneligible)logStateChange()case ReplicaDeletionSuccessful => //其前置状态只能为 ReplicaDeletionStartedreplicaState.put(partitionAndReplica, ReplicaDeletionSuccessful)logStateChange()case NonExistentReplica => //其前置状态只能为 ReplicaDeletionSuccessful。// NonExistentReplica 是副本完全删除、不存在这个副本的状态// remove this replica from the assigned replicas list for its partition//在 controller 的 partitionReplicaAssignment 删除这个 Partition 对应的 replica 信息;val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId))//将这个 Topic 从缓存中删除。replicaState.remove(partitionAndReplica)logStateChange()case OnlineReplica =>//其前置状态只能为 NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible//副本正常工作时的状态,此时的 Replica 既可以作为 leader 也可以作为 followerreplicaState(partitionAndReplica) match {case NewReplica => //其前置状态如果为 NewReplica// add this replica to the assigned replicas list for its partition//从 Controller 的 partitionReplicaAssignment 中获取这个 Partition 的 AR;val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)//如果 Replica 不在 AR 中的话,那么就将其添加到 Partition 的 AR 中;if(!currentAssignedReplicas.contains(replicaId))controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)logStateChange()case _ => //其前置状态如果为:OnlineReplica, OfflineReplica, ReplicaDeletionIneligible// check if the leader for this partition ever existed//如果该 Partition 的 LeaderIsrAndControllerEpoch 信息存在,那么就更新副本的状态,并发送相应的请求//否则不做任何处理;controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {case Some(leaderIsrAndControllerEpoch) =>brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch,replicaAssignment)replicaState.put(partitionAndReplica, OnlineReplica)logStateChange()case None => // that means the partition was never in OnlinePartition state, this means the broker never// started a log for that partition and does not have a high watermark value for this partition}}//最后将 Replica 的状态设置为 OnlineReplica 状态。replicaState.put(partitionAndReplica, OnlineReplica)case OfflineReplica => //其前置状态只能为 NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible// send stop replica command to the replica so that it stops fetching from the leader//发送 StopReplica 请求给该副本,先停止副本同步 (deletePartition = false)brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false)// As an optimization, the controller removes dead replicas from the ISRval leaderAndIsrIsEmpty: Boolean =controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {case Some(_) =>//将该 replica 从 Partition 的 isr 移除这个 replica(前提 isr 中还有其他有效副本)controller.removeReplicaFromIsr(topic, partition, replicaId) match {case Some(updatedLeaderIsrAndControllerEpoch) =>// send the shrunk ISR state change request to all the remaining alive replicas of the partition.val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)if (!controller.topicDeletionManager.isPartitionToBeDeleted(topicAndPartition)) {// 发送 LeaderAndIsr 请求给剩余的其他副本,因为 ISR 变动了brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId),topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment)}//更新这个 Replica 的状态为 OfflineReplicareplicaState.put(partitionAndReplica, OfflineReplica)logStateChange()falsecase None =>true}case None =>true}if (leaderAndIsrIsEmpty && !controller.topicDeletionManager.isPartitionToBeDeleted(topicAndPartition))throw new StateChangeFailedException(s"Failed to change state of replica $replicaId for partition $topicAndPartition since the leader " +s"and isr path in zookeeper is empty")}}catch {case t: Throwable =>stateChangeLog.error(s"Initiated state change of replica $replicaId for partition $topicAndPartition from " +s"$currState to $targetState failed", t)}}

上面 Replica 各种转移的触发的条件:
在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3268328.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

day05 Router、vuex、axios

配置 router和vuex需要在创建vue项目的时候&#xff0c;开始的时候选择Manually select features&#xff0c;于是就可以在下一个创建配置讯问中选择router和vuex。 axios则需要执行命令行&#xff1a; npm install axios -S 之后再在需要发送请求的view导入即可。 router…

某量JS逆向

https://chat.sensetime.com/wb/chat 目录 一、发起请求 二、观察发现只有入参 __data__ 进行了加密&#xff0c;返回是明文 三、 观察JS调用栈 四、从JS中搜索 __data__ 五、使用XHR对Ajax请求进行断点 六、再次发起请求就会断点拦住请求 七、对XHR入口分析 八、逐个…

挑选六西格玛管理咨询公司主要看什么

六西格玛作为一套被全球众多顶尖企业验证过的卓越绩效管理模式&#xff0c;正成为众多企业转型升级的利器。然而&#xff0c;面对市场上琳琅满目的六西格玛管理咨询公司&#xff0c;如何慧眼识珠&#xff0c;挑选出最适合自己企业的合作伙伴呢&#xff1f; 1、团队实力与专家阵…

昇思25天学习打卡营第25天|基于 MindSpore 实现 BERT 对话情绪识别

基于 MindSpore 实现 BERT 对话情绪识别 模型概述 BERT&#xff08;双向编码器表征量&#xff09;是Google于2018年发布的一种先进语言模型&#xff0c;基于Transformer架构&#xff0c;具备双向上下文理解功能。BERT的预训练方法创新性地结合了两种任务&#xff1a; Masked …

[Vulnhub] Raven2 PHPMailer-RCE+MSQP:Mysql权限提升

信息收集 IP AddressOpening Ports192.168.101.160TCP:22,80,111,46606 $ nmap -p- 192.168.101.160 --min-rate 1000 -sC -sV PORT STATE SERVICE VERSION 22/tcp open ssh OpenSSH 6.7p1 Debian 5deb8u4 (protocol 2.0) | ssh-hostkey: | 1024 26:81:c1:f…

【ROS2】演示:为有损网络使用服务质量设置

目录 背景 先决条件 运行演示 命令行选项 添加网络流量 背景 请阅读有关 QoS 设置的文档页面&#xff0c;以获取有关 ROS 2 中可用支持的背景信息。 在这个演示中&#xff0c;我们将生成一个发布相机图像的节点和另一个订阅图像并在屏幕上显示图像的节点。然后&#xff0c;我们…

【JVM基础08】——类加载器-说一下类加载的执行过程?

目录 1- 引言&#xff1a;类加载的执行过程1-1 类加载的执行过程是什么&#xff1f;(What) 2- ⭐核心&#xff1a;详解类加载的执行过程(How)2-1 加载——>加载到运行时数据区2-2 验证——>类的安全性检查2-3 准备——>为类变量分配内存并设置初始值2-4 解析——>把…

Mysql explain 优化解析

explain 解释 select_type 效率对比 MySQL 中 EXPLAIN 语句的 select_type 列描述了查询的类型,不同的 select_type 类型在效率上会有所差异。下面我们来比较一下各种 select_type 的效率: SIMPLE: 这是最简单的查询类型,表示查询不包含子查询或 UNION 操作。 这种查询通常是…

html+css 实现水波纹按钮

前言&#xff1a;哈喽&#xff0c;大家好&#xff0c;今天给大家分享htmlcss 绚丽效果&#xff01;并提供具体代码帮助大家深入理解&#xff0c;彻底掌握&#xff01;创作不易&#xff0c;如果能帮助到大家或者给大家一些灵感和启发&#xff0c;欢迎收藏关注哦 &#x1f495; 文…

python机器学习8--网络

1.超文本传输协议HTTP GET 在实际开发应用程序时&#xff0c;一定都会利用WiFi网络进行连接&#xff0c;再通过HTTP的方式读入后台的数据&#xff0c;并下载和显示在用户的PC上。这靠的是网络服务的技术&#xff0c;也就是大家提到的Web Service。而与HTTP服务器交换数据有两种…

STM32-寄存器时钟配置指南

目录 启动 SystemInit SetSysClock 总结 启动 从startup_stm32f0xx.s内的开头的Description可以看到 ;* Description : STM32F051 devices vector table for EWARM toolchain. ;* This module performs: ;* - Set the in…

Java解析epub电子书文件实战demo

如何使用 Java、Spring Boot 和 Epublib 库解析存储在阿里云对象存储服务&#xff08;OSS&#xff09;上的 EPUB 文件。这里将指导您完成设置必要依赖项、配置 OSS 客户端以及编写服务以读取和提取 EPUB 文件章节的全过程。 步骤1&#xff1a;添加依赖项 首先&#xff0c;将 E…

08 字符串和字节串

使用单引号、双引号、三单引号、三双引号作为定界符&#xff08;delimiter&#xff09;来表示字符串&#xff0c;并且不同的定界符之间可以相互嵌套。 很多内置函数和标准库对象也都支持对字符串的操作。 x hello world y Python is a great language z Tom said, "Le…

【ESP01开发实例】-ESP-01开发环境搭建与固件烧录

ESP-01开发环境搭建与固件烧录 文章目录 ESP-01开发环境搭建与固件烧录1、ESP-01介绍2、开发环境搭建3、固件下载3.1 使用 Arduino UNO 板对 ESP8266 (ESP-01) 模块进行编程3.2 使用USB 转串口转换器对 ESP8266 (ESP-01) 模块进行编程4、点亮LED本文将详细介绍如何使用 Arduino…

springboot在加了mapper之后报错

springboot在加了mapper之后报错 最后发现是spring boot版本不兼容&#xff0c;spring-boot-starter-parent换成3.0.5之后可以了

Asp .Net Core 系列:详解授权以及实现角色、策略、自定义三种授权和自定义响应

什么是授权&#xff08;Authorization&#xff09;&#xff1f; 在 ASP.NET Core 中&#xff0c;授权&#xff08;Authorization&#xff09;是控制对应用资源的访问的过程。它决定了哪些用户或用户组可以访问特定的资源或执行特定的操作。授权通常与身份验证&#xff08;Auth…

【Git-驯化】一文搞懂git中rm命令的使用技巧

【Git-驯化】一文搞懂git中rm命令的使用技巧 本次修炼方法请往下查看 &#x1f308; 欢迎莅临我的个人主页 &#x1f448;这里是我工作、学习、实践 IT领域、真诚分享 踩坑集合&#xff0c;智慧小天地&#xff01; &#x1f387; 免费获取相关内容文档关注&#xff1a;微信公…

JS+H5在线文心AI聊天(第三方接口)

源码在最后面 调用的不是文心官方接口 可以正常聊天 有打字动画 效果图 源代码 <!DOCTYPE html> <html lang"zh"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-s…

OpenSSL学习笔记及在项目中的使用

OpenSSL官方命令手册&#xff1a;OpenSSL commands - OpenSSL Documentation 参考教程&#xff1a; 操作&#xff1a;OpenSSL的基本使用教程(一&#xff09;_openssl.exe使用教程-CSDN博客 操作&#xff1a;Linux和Shell回炉复习系列文章总目录 - 骏马金龙 - 博客园 (cnblog…

vscode 调试web后端

1、调试环境配置 一、安装python环境管理器 其中要先在vscode选择对应的python环境&#xff0c;最方便的是按照环境管理器后从中选择。其中在【externsions】里面安装python即可。 如下&#xff1a; 二、编写launch.json文件 其中如下&#xff1a; {// Use IntelliSense …