Kafka-02 @KafkaListener学习

一. 引入依赖

SpringBoot 和 Kafka 搭配使用的场景,引入 spring-kafka 即可;

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.11</version>
</dependency>

二. 核心结构

先来看一下 spring-kafka 核心图;

当我们在 Spring 中注册一个 Listener,框架就会为我们自动生成一个对应的 ConcurrentMessageListenerContainer 容器来管理,再根据你配置的并发度来创建多个 KafkaMessageListenerContainer 容器,每个 KafkaMessageListenerContainer 可以粗浅的认为是一个线程,这个线程会不断向 server 端发起 poll 请求来实现监听;

  • ConcurrentMessageListenerContainer 是通过 ConcurrentMessageListenerContainerFactory 生产的;一般我们不需要去自定义 ConcurrentMessageListenerContainerFactory,Spring 容器会生成默认的 ConcurrentMessageListenerContainerFactory,也有场景需要我们去自定义 ContainerFactory;

  • ConcurrentMessageListenerContainer 中有一个属性 List<KafkaMessageListenerContainer<K, V>> containers,就是用来存放各个 KafkaMessageListenerContainer;需要厘清两者的关系;

在这里插入图片描述

三. 核心流程

先来看一下核心方法的调用流程图,略去了部分非核心流程;

执行流程如下:

  1. Spring 启动;
  2. Spring 生命周期为 finishRefresh() 时,调用 KafkaListenerEndpointRegistry 中的 start();
  3. 根据 @KafkaListener 创建对应数量的 ConcurrentMessageListenerContainer;
  4. 根据并发配置 concurrency 往 ConcurrentMessageListenerContainer 创建对应数量的 KafkaMessageListenerContainer;
  5. 在每个 KafkaMessageListenerContainer 中创建一个 SimpleAsyncTaskExecutor,值得注意的是 SimpleAsyncTaskExecutor 的作用是创建一条新的线程,并在线程停止时执行 stop();
  6. 创建一个 ListenerConsumer 注册到 SimpleAsyncTaskExecutor 中,这里的 ListenerConsumer 是一个 Runnable 对象,并且内部会创建聚合一个 KafkaConsumer 对象,SimpleAsyncTaskExecutor 中创建出的线程会执行 ListenerConsumer.run();
  7. ListenerConsumer 的 run() 被调用;
  8. run 中开启自旋;
  9. 不断调用 kafka-client 提供的 poll() 拉取新的消息;
    • 收到新的消息就执行,执行完了就继续自旋;
    • 收不新消息,重启下一轮自旋;

四. 分析

1. 启动入口

入口在 SpringApplication.run() -> SpringApplication.refreshContext() -> AbstractApplicationContext.refresh() -> AbstractApplicationContext.finishRefresh();

这个 finishRefresh() 中会调用 LifecycleProssor.onRefresh() 启动 kafka 监听器;

// ------------------------------ AbstractApplicationContext ----------------------------
protected void finishRefresh() {clearResourceCaches();initLifecycleProcessor();// 调用 LifecycleProcessor.onRefresh(),Spring 中默认的是 DefaultLifecycleProcessorgetLifecycleProcessor().onRefresh();publishEvent(new ContextRefreshedEvent(this));if (!NativeDetector.inNativeImage()) {LiveBeansView.registerApplicationContext(this);}
}// ------------------------------ DefaultLifecycleProcessor ----------------------------
public void onRefresh() {startBeans(true);this.running = true;
}// ------------------------------ DefaultLifecycleProcessor ----------------------------
private void doStart(Map<String, ? extends Lifecycle> lifecycleBeans, String beanName, boolean autoStartupOnly) {Lifecycle bean = lifecycleBeans.remove(beanName);if (bean != null && bean != this) {String[] dependenciesForBean = getBeanFactory().getDependenciesForBean(beanName);for (String dependency : dependenciesForBean) {doStart(lifecycleBeans, dependency, autoStartupOnly);}if ((!autoStartupOnly || !(bean instanceof SmartLifecycle) || ((SmartLifecycle) bean).isAutoStartup())) {try {// 获取容器中的 LifeCycle bean 对象,调用它的 start()// SpringKafka 中对应的是 KafkaListenerEndpointRegistry// 我们重点看一下 KafkaListenerEndpointRegistry.start()bean.start();}catch (Throwable ex) {throw new ApplicationContextException("Failed to start bean '" + beanName + "'", ex);}}}
}

2. KafkaListenerEndpointRegistry

KafkaListenerEndpointRegistry 是 SpringKafka 中很重要的类,是一个 SmartLifecycle 实现类对象,它里面有一个属性 listenerContainers,存放了我们的 ConcurrentMessageListenerContainer 对象;

我们先看它的 start();

// ---------------------------- KafkaListenerEndpointRegistry ---------------------------
public void start() {// 轮询所有的 ConcurrentMessageListenerContainer 对象// 执行 ConcurrentMessageListenerContainer.start()for (MessageListenerContainer listenerContainer : getListenerContainers()) {startIfNecessary(listenerContainer);}this.running = true;
}// ---------------------------- KafkaListenerEndpointRegistry ---------------------------
private void startIfNecessary(MessageListenerContainer listenerContainer) {if ((this.contextRefreshed && this.alwaysStartAfterRefresh) || listenerContainer.isAutoStartup()) {// 执行 ConcurrentMessageListenerContainer.start()listenerContainer.start();}
}// ---------------------------- AbstractMessageListenerContainer ---------------------------
public final void start() {checkGroupId();synchronized (this.lifecycleMonitor) {if (!isRunning()) {// 调用真正干事的 doStart(),进入 ConcurrentMessageListenerContainer.doStart()doStart();}}
}

我们看 ConcurrentMessageListenerContainer.doStart() 干了些啥;

3. ConcurrentMessageListenerContainer

我们看下 ConcurrentMessageListenerContainer.doStart() 干了啥;

// ---------------------------- ConcurrentMessageListenerContainer ---------------------------
protected void doStart() {if (!isRunning()) {checkTopics();ContainerProperties containerProperties = getContainerProperties();TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();if (topicPartitions != null && this.concurrency > topicPartitions.length) {this.concurrency = topicPartitions.length;}setRunning(true);// 1. 根据 @KafkaListener 中配置的 concurrency 轮询for (int i = 0; i < this.concurrency; i++) {// 2. 创建 KafkaMessageListenerContainerKafkaMessageListenerContainer<K, V> container =constructContainer(containerProperties, topicPartitions, i);// 3. 对刚创建出的 KafkaMessageListenerContainer 做一些配置configureChildContainer(i, container);if (isPaused()) {container.pause();}// 4. 启动 KafkaMessageListenerContainercontainer.start();// 5. 将 KafkaMessageListenerContainer 添加到 ConcurrentMessageListenerContainer 中this.containers.add(container);}}
}

关键流程是第 3 步和第 4 步,我们分开来看;

3.1 configureChildContainer()

对刚创建出的 KafkaMessageListenerContainer 做一些配置;

这里创建了一个 SimpleAsyncTaskExecutor,设置进 KafkaMessageListenerContainer 中;

private void configureChildContainer(int index, KafkaMessageListenerContainer<K, V> container) {String beanName = getBeanName();beanName = (beanName == null ? "consumer" : beanName) + "-" + index;container.setBeanName(beanName);ApplicationContext applicationContext = getApplicationContext();if (applicationContext != null) {container.setApplicationContext(applicationContext);}ApplicationEventPublisher publisher = getApplicationEventPublisher();if (publisher != null) {container.setApplicationEventPublisher(publisher);}// 设置 clinetIdSuffix,clientId 前缀container.setClientIdSuffix(this.concurrency > 1 || this.alwaysClientIdSuffix ? "-" + index : "");container.setGenericErrorHandler(getGenericErrorHandler());container.setCommonErrorHandler(getCommonErrorHandler());container.setAfterRollbackProcessor(getAfterRollbackProcessor());container.setRecordInterceptor(getRecordInterceptor());container.setBatchInterceptor(getBatchInterceptor());container.setInterceptBeforeTx(isInterceptBeforeTx());container.setListenerInfo(getListenerInfo());AsyncListenableTaskExecutor exec = container.getContainerProperties().getConsumerTaskExecutor();if (exec == null) {// 1. 创建出 SimpleAsyncTaskExecutor,并加入到 this.executorsexec = new SimpleAsyncTaskExecutor(beanName + "-C-");this.executors.add(exec);// 2. 将当前创建的 SimpleAsyncTaskExecutor 设置到 KafkaMessageListenerContainercontainer.getContainerProperties().setConsumerTaskExecutor(exec);}
}

3.2 container.start()

调用 KafkaMessageListenerContainer 的 start(),最终调用 KafkaMessageListenerContainer.doStart();

protected void doStart() {if (isRunning()) {return;}ContainerProperties containerProperties = getContainerProperties();checkAckMode(containerProperties);Object messageListener = containerProperties.getMessageListener();AsyncListenableTaskExecutor consumerExecutor = containerProperties.getConsumerTaskExecutor();if (consumerExecutor == null) {consumerExecutor = new SimpleAsyncTaskExecutor((getBeanName() == null ? "" : getBeanName()) + "-C-");containerProperties.setConsumerTaskExecutor(consumerExecutor);}GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;ListenerType listenerType = determineListenerType(listener);// 1. 创建 ListenerConsumer// ListenerConsumer 是一个 Runnable 对象// new ListenerConsumer() 中会创建一个 KafkaConsumer,并作为属性成员// 它的 run() 比较重要this.listenerConsumer = new ListenerConsumer(listener, listenerType);setRunning(true);this.startLatch = new CountDownLatch(1);// 2. 将 ListenerConsumer 任务放入到 SimpleAsyncTaskExecutor 中异步调用this.listenerConsumerFuture = consumerExecutor.submitListenable(this.listenerConsumer);
}

ListenerConsumer 是一个 Runnable 对象,new ListenerConsumer() 中会创建一个 KafkaConsumer,并作为属性成员,我们看下 ListenerConsumer.run();

4. ListenerConsumer.run()

我们看下 ListenerConsumer 的 run();可以看到这个任务会进入自旋去处理任务;

public void run() {ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());publishConsumerStartingEvent();this.consumerThread = Thread.currentThread();setupSeeks();KafkaUtils.setConsumerGroupId(this.consumerGroupId);this.count = 0;this.last = System.currentTimeMillis();initAssignedPartitions();publishConsumerStartedEvent();Throwable exitThrowable = null;// 开启自旋while (isRunning()) {// 通过 KafkaConsumer 向 kafka-server 发起 poll 请求pollAndInvoke();}wrapUp(exitThrowable);
}

ListenerConsumer 的 pollAndInvoke() 比较绕,总之我们知道它会通过反射调用我们 @KafkaListener 声明的方法;

我们简单看下最终调我们 @KafkaListener 声明方法的地方;

4.1 HandlerAdapter.invoke()

调用到 RecordMessagingMessageListenerAdapter.invoke();

public Object invoke(Message<?> message, Object... providedArgs) throws Exception {if (this.invokerHandlerMethod != null) {// 最终的执行入口// 最后会通过反射调用我们的 @KafkaListener 声明的方法return this.invokerHandlerMethod.invoke(message, providedArgs);} else if (this.delegatingHandler.hasDefaultHandler()) {Object[] args = new Object[providedArgs.length + 1];args[0] = message.getPayload();System.arraycopy(providedArgs, 0, args, 1, providedArgs.length);return this.delegatingHandler.invoke(message, args);} else {return this.delegatingHandler.invoke(message, providedArgs);}
}

至此,SpringKafka 分析完毕;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3226087.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

vs2022报找不到.netframework,version=V4.8的引用程序集

最近在win7上面安装vs2022 17.6版本&#xff0c;打开.net项目编译的时候出现了这个提示。 解决方案就是安装.net4.8开发者工具包&#xff0c;不是运行工具包。 安装完成之后在项目的属性中修改&#xff1a; 点击下载&#xff1a;.net4.8开发者工具包

HTML+CSS+JS 实现3D风吹草动效果(B站视频)

效果&#xff1a; 代码&#xff1a; <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta name"viewport" content"widthdevice-width, initial-scale1.0" /><title>3D effect&…

CAD应用程序开发工具CST CAD Navigator 1.4.0.1 正式发布—— 带来了 G 代码生成功能

CST CAD Navigator是一款兼容Windows和Linux的CAD应用程序。在其简单的界面下&#xff0c;有一个可以快速查看2D图纸和3D模型的强大核心。软件可以轻松地导入和导出文件&#xff0c;获取尺寸&#xff0c;并创建截面视图。 下载最新版CST CAD Navigatorhttps://www.evget.com/p…

SpringIOC原理

SpringIOC原理 1.概念 Spring通过一个配置文件描述Bean及Bean之间的依赖关系&#xff0c;利用Java语言的反射功能实例化Bean并建立Bean之间的依赖关系。Spring的IOC容器在完成这些底层工作的基础上&#xff0c;还提供了Bean实例缓存、生命周期管理、Bean实例代理、事件发布、…

台灯的种类有哪些?五款护眼台灯测评分析

随着时代的发展&#xff0c;现在市面上出现了更为护眼的选择——LED台灯。然而&#xff0c;面对琳琅满目的产品&#xff0c;挑选一款合适的台灯似乎成了一项挑战。那么&#xff0c;我们应该如何从众多选项中&#xff0c;找到那款既实用又护眼的台灯呢&#xff1f;以下内容将为您…

一个项目学习Vue3---Vue3中自带的事件

1. .stop 阻止事件继续传播&#xff0c;即防止事件冒泡到父元素。 <div click.stop"handleClick">点击我</div> 2. .prevent 阻止事件的默认行为&#xff0c;比如阻止表单提交时的页面刷新。 <form submit.prevent"handleSubmit">阻…

替你测试过了,这些国产大模型都很强,快来试试哪款适合你

替你测试过了&#xff0c;这些国产大模型都很强&#xff0c;快来试试哪款适合你 &#x1f4a1;&#x1f525; 大家好&#xff0c;我是猫头虎&#xff0c;科技自媒体博主。今天我将为大家介绍几款顶尖的国产AI大模型&#xff0c;它们各有所长&#xff0c;看看哪一款更适合你的需…

esp8266+micropython+irsend红外发射调试记录

在网上搜索esp8266micropython的红外发射库&#xff0c;没找到&#xff0c;发现 接收库是有的&#xff0c;可以参考&#xff1a;基于MicroPython的ESP8266连接外设IO&#xff08;二&#xff09;_micropython 红外接收-CSDN博客 可惜没有发射&#xff0c;很不方便。 这里都有介…

Echarts折线图---带颜色过度---的小demo

效果: 代码: <template><div id"lineEchtar"><div id"lineEchtars" style"min-height: 300px; width: 100%"></div></div> </template><script> import * as echarts from "echarts"; //…

PHP充电桩小程序系统源码

绿色出行新伴侣&#xff01;充电桩小程序&#xff0c;让充电不再烦恼✨ &#x1f50b; 开篇&#xff1a;告别电量焦虑&#xff0c;充电桩小程序来救场&#xff01; 在这个电动车日益普及的时代&#xff0c;电量不足成了不少车主的“心头大患”。但别担心&#xff0c;充电桩小…

手机容器化 安装docker

旧手机-基于Termux容器化 1、安装app 在手机上安装Termux或ZeroTermux&#xff08;Termux扩展&#xff09; 1.1 切换源 注&#xff1a;可以将termux进行换源&#xff0c;最好采用国内源&#xff0c;例如&#xff1a;清华源等 更新包列表和升级包&#xff08;可选&#xff0…

智能设备中的语音是如何写入语音芯片的

你是否曾好奇&#xff0c;那些智能设备中发出的清晰而自然的语音&#xff0c;是如何被巧妙地植入到微小的语音芯片中的呢&#xff1f;难道真的是通过我们日常使用的电脑吗&#xff1f;今天&#xff0c;就让我们一起探索将语音写入语音芯片的过程。 1、准备语音文件&#xff1a;…

低代码技术革新:高效构建现代人事管理系统

引言 在快速变化的商业环境中&#xff0c;企业必须不断提升其内部管理效率&#xff0c;以保持竞争力和灵活性。人事管理系统作为企业核心业务系统之一&#xff0c;承担着招聘、培训、绩效管理等重要功能&#xff0c;直接影响着企业的人才管理和运营效率。传统的人事管理系统通常…

GuLi商城-商品服务-API-品牌管理-OSS获取服务端签名

新建第三方服务: 引入common 把common中oss的依赖都拿到第三方服务中来 配置文件: 加上nacos注解:<

品牌策划学习资源全攻略:从入门到精通的推荐清单!

这里再分享一些网站书籍和杂志给大家。 TOPYS创意内容平台&#xff1a; 专注于创意内容分享&#xff0c;涵盖广告、设计、艺术等多个领域&#xff0c;是广告设计人寻找创意灵感的好去处。 Dribbble&#xff1a; 设计师社区&#xff0c;用户可以浏览到全球设计师的优秀作品&…

Centos7 安装Docker步骤及报错信息(不敢说最全,但是很全)

一、操作系统要求&#xff1a; 要安装Docker Engine&#xff0c;您需要CentOS 7及以上的维护版本。存档版本不受支持或测试。必须启用centos临时存储库。默认情况下&#xff0c;此存储库已启用&#xff0c;但如果已禁用&#xff0c;则需要重新启用它。建议使用overlay2存储驱动…

利用远程桌面进行开发,

现在的软硬件开发都涉及庞杂的软硬件环境和多种外设总线部署&#xff0c;这时我们利用远程工具和windows自带的wsl虚拟机环境再配合vscode的remote ssh远程开发模式&#xff0c;可自由的在linux windows android等平台上切换&#xff0c;让开发更顺畅&#xff0c;也可以更好的利…

掌控Camunda:深入了解camunda-engine模块

欢迎来到我的博客&#xff0c;代码的世界里&#xff0c;每一行都是一个故事 &#x1f38f;&#xff1a;你只管努力&#xff0c;剩下的交给时间 &#x1f3e0; &#xff1a;小破站 掌控Camunda&#xff1a;深入了解camunda-engine模块 前言Camunda-engine模块概述简介架构设计设…

初赛倒计时,第二届OPENAIGC开发者大赛作品提交开始

由联想拯救者、AIGCOPEN开放社区、英特尔联合主办的“2024 OPENAIGC开发者大赛”将于本周&#xff08;7月13、14日&#xff09;迎来线上初赛评审环节。距离评审正式开始仅剩不到一周的时间&#xff0c;在此提醒各位参赛者抓紧时间&#xff0c;尽快在7月11日24:00前完善并提交作…

深度学习之梯度消失

在深度学习中&#xff0c;梯度消失是指在反向传播过程中&#xff0c;随着网络层数增加或者在使用特定类型的激活函数&#xff08;如sigmoid函数&#xff09;时&#xff0c;梯度逐渐变小并最终趋近于零的现象。这种现象导致在更新参数时&#xff0c;底层网络的权重几乎不会得到有…