Kafka入门二——SpringBoot连接Kafka示例

实现

1.引入maven依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.9</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.example</groupId><artifactId>kafka-demo</artifactId><version>0.0.1-SNAPSHOT</version><name>kafka-demo</name><description>kafka-demo</description><properties><java.version>8</java.version></properties><dependencies><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><image><builder>paketobuildpacks/builder-jammy-base:latest</builder></image></configuration></plugin></plugins></build></project>

2.修改application.properties配置

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.bootstrap-servers=10.20.37.85:9092
spring.kafka.consumer.group-id=demo

通过spring.kafka.consumer.auto-offset-reset配置可以指定Kafka消费者的行为,当遇到没有初始偏移量或当前偏移量不再服务器上存在的情况时的处理策略。这个配置有以下三个可选值:

  • earliest:当消费者组中没有存储的偏移量,或者偏移量超出了可用范围时,从最早的可用消息开始消费。
  • latest(默认):当消费者组中没有存储的偏移量,或者偏移量超出了可用范围时,从最新的可用消息开始消费。
  • none:当服务器上找不到消费者的偏移量时,抛出异常。

通过spring.kafka.bootstrap-servers配置可以指定Kafka的地址。这个配置的值应该是一个或多个Kafka服务器的地址,用逗号分隔。

通过spring.kafka.consumer.group-id配置可以指定Kafka消费者的组ID。这个配置的值应该是一个字符串,用于标识消费者所属的组。

定义Configuer引入kafka Bean

@Component
public class MyConfiguer {@Beanpublic NewTopic topic() {return TopicBuilder.name("topic1").partitions(10).replicas(1).build();}
}

使用了@Bean注解,表示该方法将返回一个Spring管理的bean对象。在这个方法中,通过调用TopicBuilder类的name方法指定了主题的名称为"topic1",然后使用partitions方法设置了主题的分区数为10,使用replicas方法设置了主题的副本数为1。最后,调用build方法构建并返回了一个NewTopic对象。

定义生产者

@Component
public class KafkaProducer {@Beanpublic ApplicationRunner runner(KafkaTemplate<String, String> template) {return args -> {template.send("topic1", "test");};}
}

ApplicationRunner接口是Spring Boot提供的一个回调接口,用于在应用程序启动后执行一些操作。这段代码的作用是在应用程序启动后自动发送一条消息到Kafka的"topic1"主题,消息内容为"test"。

定义消费者

@Component
public class KafkaConsumer {@KafkaListener(id = "myId", topics = "topic1")public void listen(String in) {System.out.println(in);}
}

@KafkaListener注解用于标记方法作为Kafka消息的消费者。这段代码的作用是创建一个Kafka消费者,监听名为"topic1"的主题,并将接收到的消息内容打印到控制台。

总结

生产者

Kafka生产者发送消息的流程如下:

  • 创建ProducerRecord:首先,生产者需要创建一个ProducerRecord对象,该对象包含目标主题和要发送的内容。如果需要,还可以指定消息的键(key)或分区(partition)。

  • 序列化:生产者将ProducerRecord对象中的键和值进行序列化,转换为字节数组,以便能够在网络上传输。

  • 拦截器链:消息会通过一个由一个或多个拦截器组成的链。这些拦截器可以对消息进行预处理,例如添加头信息或实施自定义的逻辑。

  • 元数据加载:生产者在发送消息之前,需要加载Kafka集群的元数据,以确定各个主题的分区和副本分布情况。

  • 计算分区号:根据ProducerRecord中的键和分区策略(如果有的话),生产者会计算出消息应该发送到哪个分区。

  • 累加器缓存:消息被缓存到RecordAccumulator累加器中,这是一个按照批次处理消息的组件。

  • Sender线程发送:Sender线程负责将符合条件的消息批次发送给Kafka broker。除了发送消息外,Sender线程也负责更新元数据。

  • 等待确认:生产者可以选择等待broker的确认,以确保消息已经被成功接收并写入日志。

  • 处理响应:生产者处理来自broker的响应,包括错误处理和重试逻辑。

  • 完成发送:一旦消息被成功发送并且确认,生产者会继续发送下一批消息或者结束发送过程。
    在这里插入图片描述

消费者

Kafka消费者接收消息的流程涉及到多个关键步骤,确保了消息能够从broker高效地传送到消费者手中。以下是该过程的详细描述:

  • 创建消费者实例:需要创建一个Kafka消费者实例,这通常涉及到指定一些关键参数,如Kafka集群的地址、消费者组ID、反序列化器等。

  • 订阅主题:消费者需要订阅一个或多个感兴趣的主题。这意味着消费者希望从这些主题接收消息。

  • 协调消费者组:在ConsumerCoordinator的poll方法中,会有一个ensureActiveGroup的过程,消费者通过这个过程向Coordinator发送加入消费者组的请求,并等待Coordinator的响应。

  • 拉取消息:一旦加入了消费者组,消费者开始从订阅的主题中拉取(pull)消息。Kafka采用pull模式而不是push模式,这意味着消费者需要主动去服务器拉取消息。

  • 处理消息:消费者获取消息后,会进行相应的处理。这个处理过程可能包括数据转换、存储或者其他业务逻辑。

  • 提交消费位移:消费者在处理完消息后,需要提交消费位移。这是告诉Kafka它已经消费到了哪个位置,以便下次从正确的位置开始消费。位移的提交是由消费者自己管理的,Kafka提供了接口来更新这些位移。

  • 控制消费速率:消费者可以通过各种方式控制消息的消费速率,例如通过限制拉取操作的频率或者调整消费者的线程数。

  • 错误处理和重试:在消费过程中可能会遇到错误,比如网络问题或者消息格式错误。消费者需要有相应的错误处理机制来处理这些情况,并在必要时进行重试。

  • 关闭消费者:完成消息消费后,应当关闭消费者实例,释放资源。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/2805889.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

软考38-上午题-【数据库】-关系模式

一、关系模式中的基本术语 关系数据库系统是支持关系模式的数据库系统。 1、关系 一个关系就是一张二维表&#xff0c;每个关系都有一个关系名。 2、元组 表中的一行即为一个元组&#xff0c;对应存储文件中的一个记录值。 3、属性 表中的列称为属性&#xff0c;每一列有一…

构建生物医学知识图谱from zero to hero (3):生物医学命名实体识别和链接

生物医学实体链接 🤓现在是激动人心的部分。对于NLP和命名实体识别和链接的新手,让我们从一些基础知识开始。命名实体识别技术用于检测文本中的相关实体或概念。例如,在生物医学领域,我们希望在文本中识别各种基因、药物、疾病和其他概念。 生物医学概念提取 在这个例子中…

《Linux C编程实战》笔记:信号量

信号量在操作系统的书里一般都有介绍&#xff0c;这里就只写书上说的了。 信号量是一个计数器&#xff0c;常用于处理进程或线程的同步问题&#xff0c;特别是对临界资源访问的同步。临界资源可以简单地理解为在某一时刻只能由一个进程或线程进行操作的资源&#xff0c;这里的…

删除遥感影像raster:另一个程序正在使用此文件,进程无法访问

问题&#xff1a; 在文件夹中删除处理过程得到的临时影像时&#xff0c;出现了上面的问题 os.remove(os.path.join(workspace2.replace(.tif, .cut.tif)))原因&#xff1a; 在文件夹中删除任何内容&#xff0c;比如文本、图片、影像时&#xff0c;都要先关闭这个对象 解决方…

【这个词(Sequence-to-Sequence)在深度学习中怎么解释,有什么作用?】

&#x1f680; 作者 &#xff1a;“码上有前” &#x1f680; 文章简介 &#xff1a;深度学习笔记 &#x1f680; 欢迎小伙伴们 点赞&#x1f44d;、收藏⭐、留言&#x1f4ac; Sequence-to-Sequence&#xff08;Seq2Seq&#xff09; Sequence-to-Sequence&#xff08;Seq2Seq…

加密函数f

1. 加密函数f 加密函数f(Ri-1,Ki)是DES中的核心算法,该函数包含选择运算E、异或运算、代替函数组S(S盒变换)、置换运算P,其流程如图所示。 (1)选择运算E与异或运算。选择运算E就是把Ri-1的32位扩展到48位,并与48位子秘钥Ki进行异或运算。具体扩展方式如图所示。 (2…

WordPres Bricks Builder 前台RCE漏洞

免责声明&#xff1a;文章来源互联网收集整理&#xff0c;请勿利用文章内的相关技术从事非法测试&#xff0c;由于传播、利用此文所提供的信息或者工具而造成的任何直接或者间接的后果及损失&#xff0c;均由使用者本人负责&#xff0c;所产生的一切不良后果与文章作者无关。该…

持续集成,持续交付和持续部署的概念,以及GitLab CI / CD的介绍

引言&#xff1a;上一期我们部署好了gitlab极狐网页版&#xff0c;今天我们介绍一下GitLabCI / CD 目录 一、为什么要 CI / CD 方法 1、持续集成 2、持续交付 3、持续部署 二、GitLab CI / CD简介 三、GitLab CI / CD 的工作原理 4、基本CI / CD工作流程 5、首次设置 …

视界未来:Sora领航AI视频模型的科技进步

随着人工智能技术的飞速发展&#xff0c;AI视频模型正逐渐成为视频内容创作、编辑和呈现的重要工具。在这个充满潜力的领域中&#xff0c;Sora作为一种领先的AI视频模型&#xff0c;正引领着科技的进步&#xff0c;并为我们带来了无限的想象空间。本文将探讨Sora在科技进步中的…

SpringBoot+PDF.js实现按需分片加载预览(包含可运行示例源码)

SpringBootPDF.js实现按需分片加载预览 前言分片加载的效果前端项目前端项目结构前端核心代码前端项目运行 后端项目后端项目结构后端核心代码后端项目运行 项目运行效果首次访问分片加载 项目源码 前言 本文的解决方案旨在解决大体积PDF在线浏览加载缓慢、影响用户体验的难题…

什么是IP地址,IP地址详解

在互联网的世界中&#xff0c;每一台连接的设备都需要一个独特的标识&#xff0c;这就是IP地址。IP地址&#xff0c;全称为“Internet Protocol Address”&#xff0c;即互联网协议地址&#xff0c;它是网络中进行数据传输的基础。下面&#xff0c;我们将对IP地址进行详细的解析…

幻兽帕鲁(Palworld 1.4.1)私有服务器搭建(docker版)

文章目录 说明客户端安装服务器部署1Panel安装和配置docker服务初始化设置设置开机自启动设置镜像加速 游戏服务端部署游戏服务端参数可视化配置 Palworld连接服务器问题总结 说明 服务器硬件要求&#xff1a;Linux系统/Window系统&#xff08;x86架构&#xff0c;armbian架构…

Vue+SpringBoot打造社区买菜系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、系统设计2.1 功能模块设计2.1.1 数据中心模块2.1.2 菜品分类模块2.1.3 菜品档案模块2.1.4 菜品订单模块2.1.5 菜品收藏模块2.1.6 收货地址模块 2.2 可行性分析2.3 用例分析2.4 实体类设计2.4.1 菜品分类模块2.4.2 菜品档案模块2.4.3…

在openEuler中通过KVM可视化安装华为FusionCompute的CNA主机

一、环境说明 在Windows物理主机上通过VMware WorkStation创建一个虚拟机&#xff08;4U4C、16GB内存&#xff0c;400GB磁盘&#xff0c;NAT网络连接&#xff09;&#xff0c;在虚拟机中安装openEuler 22.03 LTS系统&#xff0c;并将该虚拟机作为部署 FusionCompute的服务器&a…

maven3下载地址(含旧版本)

因为现有的3.8版本与IDEA不兼容&#xff0c;我需要下载3.6版本&#xff0c;但是官网的位置非常隐蔽&#xff0c;找了很多资料才看到。故记录一下。Index of /dist/maven/maven-3 选择需要的版本 选择binaries 选择zip文件下载就可以了

Modern C++ std::visit从实践到原理

前言 std::visit 是 C17 中引入的一个模板函数&#xff0c;它用于对给定的 variant、union 类型或任何其他兼容的类型执行一个访问者操作。这个函数为多种可能类型的值提供了一种统一的访问机制。使用 std::visit&#xff0c;你可以编写更通用和灵活的代码&#xff0c;而无需关…

静态时序分析:SDC约束命令set_input_transition详解

相关阅读 静态时序分析https://blog.csdn.net/weixin_45791458/category_12567571.html DC工具在使用set_drive和set_driving_cell建模输入端口驱动能力时&#xff0c;会自动计算输入端口的转换时间&#xff0c;以及由于电阻或驱动单元带来的额外输入端口延迟。 set_input_tra…

Linux基础知识——命令行模式下命令的执行

文章目录 Linux基础知识——命令行模式下命令的执行开始执行Linux命令Linux基础命令的操作常用Linux命令行操作按键Linux输出错误信息查看 Linux系统在线帮助--help选项man命令info命令其他有用的文件文档百度搜索 文本编辑器&#xff1a;nanonano启动&#xff01; 正确关机方法…

Three.js加载PLY文件

这是官方的例子 three.js webgl - PLY 我在Vue3中使用&#xff0c;测试了好久始终不显示点云数据。在网上查询后发现ply文件要放置在public目录下才行 <el-row><el-button type"primary" class"el-btn" click"IniThree1">PLY</…

【ArcGIS微课1000例】0104:二位面状数据转三维多面体(建筑物按高度拉伸)

文章目录 一、加载数据二、添加高度字段三、三维拉伸显示四、生成三维体数据五、注意事项一、加载数据 打开ArcScene,加载配套实验数据(0104.rar中的二维建筑物矢量数据,订阅专栏,获取专栏所有文章阅读权限及配套数据),如下图所示: 二、添加高度字段 本实验将二维数据…