大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka (正在更新…)

章节内容

上节我们完成了:

  • topics.sh、producer.sh、consumer.sh 脚本的基本使用
  • pom.xml 配置
  • JavaAPI的使用:producer 和 consumer

在这里插入图片描述

架构图

上节已经出现过了,这里再放一次
在这里插入图片描述

POM

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>springboot-kafka</artifactId><version>1.0-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.2.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

配置文件

我们常见的配置文件如下图:

spring:kafka:bootstrap-servers: localhost:9092consumer:group-id: my-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializertemplate:default-topic: my-topic

Producer

编写代码

编写了一个KafkaProducerController
里边写了两个方法,都是使用了 KafkaTemplate 的工具。

@RestController
public class KafkaProducerController {@Resourceprivate KafkaTemplate<Integer, String> kafkaTemplate;@RequestMapping("/sendSync/{message}")public String sendSync(@PathVariable String message) {ProducerRecord<Integer, String> record = new ProducerRecord<>("wzk_topic_test", 0, 1, message);ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(record);try {SendResult<Integer, String> result = future.get();System.out.println(result.getProducerRecord().key() + "->" +result.getProducerRecord().partition() + "->" +result.getProducerRecord().timestamp());} catch (Exception e) {e.printStackTrace();}return "Success";}@RequestMapping("/sendAsync/{message}")public String sendAsync(@PathVariable String message) {ProducerRecord<Integer, String> record = new ProducerRecord<>("wzk_topic_test", 0, 2, message);ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(record);future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {@Overridepublic void onFailure(Throwable ex) {System.out.println("发送失败!");ex.printStackTrace();}@Overridepublic void onSuccess(SendResult<Integer, String> result) {System.out.println("发送成功");System.out.println(result.getProducerRecord().key() + "->" +result.getProducerRecord().partition() + "->" +result.getProducerRecord().timestamp());}});return "Success";}}

测试结果

http://localhost:8085/sendSync/wzktest1
http://localhost:8085/sendAsync/wzktest2
http://localhost:8085/sendAsync/wzktest222222

我们观察控制台的效果如下:
在这里插入图片描述

Consumer

编写代码

编一个类来实现Consumer:

@Configuration
public class KafkaConsumer {@KafkaListener(topics = {"wzk_topic_test"})public void consume(ConsumerRecord<Integer, String> consumerRecord) {System.out.println(consumerRecord.topic() + "\t"+ consumerRecord.partition() + "\t"+ consumerRecord.offset() + "\t"+ consumerRecord.key() + "\t"+ consumerRecord.value());}}

测试运行

2024-07-12 13:48:46.831  INFO 15352 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=wzk-test] Setting offset for partition wzk_topic_test-0 to the committed offset FetchPosition{offset=13, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=h121.wzk.icu:9092 (id: 0 rack: null), epoch=0}}
2024-07-12 13:48:46.926  INFO 15352 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : wzk-test: partitions assigned: [wzk_topic_test-0]
wzk_topic_test	0	13	1	wzktest
wzk_topic_test	0	14	2	wzktest222
wzk_topic_test	0	15	2	wzktest222222

控制台的截图如下:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3281315.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

大厂的堡垒机到底是啥?为什么需要它?

什么是堡垒机 堡垒机&#xff0c;即在一个特定的网络环境下&#xff0c;为了保障网络和数据不受来自外部和内部用户的入侵和破坏&#xff0c;而运用各种技术手段监控和记录运维人员对网络内的服务器、网络设备、安全设备、数据库等设备的操作行为&#xff0c;以便集中报警、及…

【文件解析漏洞】实战详解!

漏洞描述&#xff1a; 文件解析漏洞是由于中间件错误的将任意格式的文件解析成网页可执行文件&#xff0c;配合文件上传漏洞进行GetShell的漏洞! IIS解析漏洞&#xff1a; IIS6.X&#xff1a; 方式一:目录解析 在网站下建立文件夹的名字为.asp/.asa 的文件夹&#xff0c;其目…

免费发送邮件两种接口方式:SMTP和邮件API

SMTP与邮件API在处理大批量邮件发送时&#xff0c;哪个更稳定&#xff1f; 在现代信息化的社会中&#xff0c;邮件已成为不可或缺的沟通工具。无论是个人还是企业&#xff0c;发送邮件都是日常工作的一部分。AokSend将详细介绍两种常用的免费发送邮件接口方式&#xff1a;SMTP…

麒麟V10系统统一认证子系统国际化

在适配麒麟V10系统统一认证子系统国际化过程中&#xff0c; 遇到了很多的问题&#xff0c;关键是麒麟官方的文档对这部分也是粗略带过&#xff0c;遇到的问题有: &#xff08;1&#xff09;xgettext无法提取C源文件中目标待翻译的字符串。 &#xff08;2&#xff09;使用msgf…

程序一调用这个接口就会崩溃, 因为他的静态库添加是放在release文件下,而我用的debug模式

程序一调用这个接口就会崩溃 因为他的静态库添加是放在release文件下 而我用的debug模式 DESTDIR ../x64/ReleaseINCLUDEPATH ./../3rdparty/ZZDecode/include LIBS -lopengl32 \-lglu32 \-luser32 \./../3rdparty/ZZDecode/x64/release/ZZDecodeInterface.lib

Python软件开发:AI毕业设计生成器引领未来

&#x1f31f; 革新软件开发&#xff1a;Python毕业设计生成器引领未来 &#x1f680; 目录 &#x1f31f; 革新软件开发&#xff1a;Python毕业设计生成器引领未来 &#x1f680;&#x1f393; 课题简介&#x1f31f; 开发目的&#x1f4c8; 开发意义 &#x1f4da; 研究方法&…

Jvm的无关性

Jvm具有无关性&#xff0c;主要体现在两个方面&#xff1a; 平台无关性&#xff1a;任何操作系统都能运行Java代码。 语言无关性&#xff1a;Jvm能运行除Java以外的其他代码。 Java源代码首先需要使用Javac编译器编译成 .class文件&#xff0c;然后由Jvm执行.class文件&…

如何准备 Java API 文档以供下游对接

个人名片 🎓作者简介:java领域优质创作者 🌐个人主页:码农阿豪 📞工作室:新空间代码工作室(提供各种软件服务) 💌个人邮箱:[2435024119@qq.com] 📱个人微信:15279484656 🌐个人导航网站:www.forff.top 💡座右铭:总有人要赢。为什么不能是我呢? 专栏导…

如何在 Odoo 16 Studio 中添加智能选项卡和管道

具有优雅定制功能的软件系统&#xff08;如 Odoo ERP&#xff09;可让客户调整和个性化其公司应用程序。定制在过去并不普遍&#xff0c;但现在对于组织来说&#xff0c;满足客户需求和需求激增至关重要。即使许多行业的竞争很少&#xff0c;但当前的竞争市场仍不稳定。尽管引入…

Mybatis批量更新数据库错误

问题&#xff1a;记录一次使用Mybatis批量更新数据库的错误&#xff0c;错误信息&#xff0c;Error updating database. Cause: org.postgresql.util.PSQLException: 错误: 字段 "update_time" 的类型为 timestamp without time zone, 但表达式的类型为 text 建议&am…

Prometheus+Grafana 监控平台实践-搭建常用服务监控告警

前言 Prometheus 是一个开放性的监控解决方案,通过各种 Exporter 采集当前主机/服务的数据,和 Grafana 相结合可以实现强大的监控和可视化功能 本篇将分享使用 docker compose 构建 Prometheus+Grafana,并监控之前文章所搭建的主机&服务,分享日常使用的一些使用经验 文…

7月速览| 卓翼飞思获荣誉、助大赛、展技术!

行业殊荣 ● 荣获 “全国低空经济先导产业行业产教融合共同体” 常务理事单位称号&#xff0c;助力打造低空经济产业领域人才智库。 “共同体”是低空经济领域&#xff0c;国家职教战略与新质生产力发展战略融合对接的重要成果。旨在汇聚优质资源&#xff0c;搭建交流平台&…

传统放牧方式与北斗科技的碰撞:北三短报文头羊定位追踪器PD28守护放牧生活

在大草原的广袤天地中&#xff0c;放牧生活是蒙古族人民的传统之一。然而&#xff0c;除了美丽和自由&#xff0c;放牧生活也伴随着一些危险。以前由于科技落后&#xff0c;人工成本低&#xff0c;主要依靠人力去放牧&#xff0c;牧民放牧顶风踏雪走个几十公里都极为寻常。除了…

AI赋能交通治理:非机动车监测识别技术在城市街道安全管理中的应用

引言 城市交通的顺畅与安全是城市管理的重要组成部分。非机动车如自行车、电动车、摩托车等在城市交通中扮演着重要角色&#xff0c;但同时也带来了管理上的挑战。尤其是在机动车道上误入非机动车的现象&#xff0c;不仅影响交通秩序&#xff0c;还可能引发交通事故。思通数科…

upload-labs靶场(1-19关)

upload-labs靶场 简介 upload-labs是一个使用php语言编写的&#xff0c;专门收集渗透测试过程中遇到的各种上传漏洞的靶场。旨在帮助大家对上传漏洞有一个全面的了解。目前一共19关&#xff0c;每一关都包含着不同上传方式。 注意&#xff1a;能运行<?php phpinfo();?&…

Linux的软硬连接

目录 见一下软硬连接 特征 什么是软硬连接&#xff0c;有什么用 软连接有什么用 硬连接有什么用 总结 见一下软硬连接 建立一个软连接 建立硬连接 特征 1. 软连接是一个独立的文件&#xff0c;因为有自己的inode号&#xff0c;由上图可知&#xff1b; 软连接的内容&…

怎麼使用ixbrowser指紋流覽器?

ixBrowser是一款指紋流覽器流覽器&#xff0c;利用指紋隔離技術確保在與Pixelscan等第三方檢測網站進行測試時具有出色的通過率&#xff0c;能夠輕鬆管理多個獨立帳戶。此外&#xff0c;ixBrowser能夠創建無限的獨立個人資料並邀請團隊成員。簡化了運營&#xff0c;降低了運營成…

java基础概念07-switch语句

一、switch定义 二、基本语法 switch (expression) { case value1: // 当expression的值等于value1时执行的代码 break; // 可选 case value2: // 当expression的值等于value2时执行的代码 break; // 可选 // 你可以有任意数量的case语句 default: // 可选 // 当没有…

计算机毕业设计Hadoop+Spark旅游景点可视化 旅游景点推荐系统 景区游客满意度预测与优化 Apriori算法 景区客流量预测 旅游大数据 景点规划

### 开题报告 **论文题目&#xff1a;** 基于Spark的旅游景点可视化系统的设计与实现 **研究背景与意义&#xff1a;** 随着旅游业的快速发展&#xff0c;人们对旅游信息的获取和处理需求越来越高。传统的旅游信息系统虽然能够提供静态的数据查询和展示功能&#xff0c;但在…

Scrapy 爬取旅游景点相关数据(七):利用指纹实现“不重复爬取”

本期学习&#xff1a; 利用网页指纹去重 众所周知&#xff0c;代理是要花钱的&#xff0c;那么在爬取&#xff08;测试&#xff09;巨量网页的时候&#xff0c;就不可能对已经爬取过的网站去重复的爬&#xff0c;这样会消耗大量的时间&#xff0c;更重要的是会消耗大量的IP (金…