spring boot 集成kafka ,并且实现 发送信息,进行消费信息(亲测有效)

目录

  • 1 目标
  • 2 实现

1 目标

有一个spring boot 项目,现在要集成kafka ,并且要实现 生产者,消费者信息;

前提是我们要有一个kafka 软件,也就是kafka 是一个软件,我们得安装成功,并且可以访问

kafka windows版本的下载安装,并且本地使用(亲测有效)

以上安装成功之后,我们可以使用软件链接一下,确保我们安装这个软件成功

在这里插入图片描述
在这里插入图片描述

显示绿色,就是链接本地的kafka 成功

2 实现

既然本地kafka 已经安装成功,接下来我们自己的springboot 项目要集成这个kaffa ,其实和我们项目集成redis 操作一样

第一步,加入依赖

        <!--        kafka--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.5.4.RELEASE</version></dependency>

第二步,yml 里面添加配置

spring:kafka:# kafka 所在IP 与 端口bootstrap-servers: 127.0.0.1:9092producer:retries: 3  # 重试次数batch-size: 16384 # 批量大小buffer-memory: 33554432 # 生产端缓冲区大小key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据# none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常auto-offset-reset: earliest# 是否自动提交offsetenable-auto-commit: true# 提交offset延时(接收到消息后多久提交offset)auto-commit-interval: 1000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

第三步,写生产者与消费者

在这里插入图片描述
以后我们的controller 或者 service 就调用生产者,消费者写好之后就自动监听信息,并且进行处理信息了,也就是把我们的业务逻辑写到消费者里面就可以

生产者里面的代码

package com.jing.db2word.postgresql.kafka.producer;import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.tomcat.util.bcel.Const;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;/*** 发送者* */
@Slf4j
@Component
public class Producer {private ObjectMapper objectMapper = new ObjectMapper();@Autowiredprivate KafkaTemplate<String, Object> kafkaTemplate;/*** 封装一下 发送信息的底层逻辑,只是topic 不一样* @param obj  发送的具体信息*/public void geojsonSync(Object obj) {try {sendMsg("jing-test", objectMapper.writeValueAsString(obj));} catch (JsonProcessingException e) {log.error("jing-test  ===> 发生异常:", e);}}/*** 底层发送信息* @param topic 主题* @param message  具体信息* */public void sendMsg(String topic, Object message) {if (!StringUtils.isEmpty(topic)) {kafkaTemplate.send(topic, message).addCallback(success -> {// 消息发送到的分区int partition = success.getRecordMetadata().partition();// 消息在分区内的offsetlong offset = success.getRecordMetadata().offset();}, failure -> {log.error("发送消息失败:" + failure.getMessage());});}}
}

消费者里面的代码

package com.jing.db2word.postgresql.kafka.consumer;import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;/*** 数据消费: 这个类里面会监听topic,自动监听* */
@Slf4j
@Component
public class Consumer {//调用预处理方法@KafkaListener(topics = "jing-test",groupId = "jing-test")public void geojsonProcess(String message) {try {log.info("接收消息成功:{}:", message);message = message.replace("\"", "").replace("\"", "");//c1DExampleFc.addGeojson(message);System.out.println("我监听到了:"+message);}catch (Exception e){log.error("jing-test  ===> 发生异常:", e.getMessage());}}
}

第五步,写controller

@Api(tags = "kafka测试")
@RestController
@RequestMapping("/kafka")
public class KafkaController {@Autowiredprivate Producer producer;@ApiOperation(value = "发送kafka", httpMethod = "POST")@PostMapping("/sendKafka")public AjaxResult sendKafka(String code) {producer.geojsonSync(code);return AjaxResult.success("成功");}
}

以上就写好了,我们进行测试

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
发现控制台也输出流,kafka 软件里面也有记录,所以我们项目就集成了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3015188.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

算法课程笔记——二维DP

算法课程笔记——二维DP

什么是电脑监控软件?哪些监控软件好用?

电脑监控软件是一种用于监控和管理计算机系统和数据的工具。它可以对计算机的使用情况进行实时监控&#xff0c;记录用户的操作行为&#xff0c;并及时发出警报&#xff0c;以防止数据泄露、违规操作和其他安全问题的发生。在当今信息时代&#xff0c;保护企业和个人信息安全变…

AI编码时代到来?实现编程梦想的利器—Baidu Comate测评

文章目录 Comate智能编码是什么&#xff1f;Comate支持的环境 Comate应用安装实际操作对话式生成代码生成代码注释智能单测项目测试调优功能 总结 Comate智能编码是什么&#xff1f; 在如今这个拥抱AI的时代&#xff0c;市面上已经产出了很多Ai代码助手&#xff0c;如果你还没…

TradingView 使用方法

【前言】最近项目中用到了Tradingview中的K线图,基于以前从未使用过,写此篇文章记录一下Tradingview的使用。 【目标】 1 会使用Tradingview中k线图的渲染方式 2 了解一些基本的用法 一 简介 Tradingview是一个价格图表和分析软件,提供免费和付费选项,为优秀的交易技术分析…

java中的变量、数据类型、人机交互

变量 变量要素 1、类型&#xff1b;每一个变量都需要定义类型&#xff08;强类型&#xff09;其它语言有弱类型&#xff08;js&#xff09; 2、变量名&#xff1b; 3、存储的值&#xff1b; 声明方式&#xff1a; 数据类型 变量名 变量值&#xff1b; public static vo…

电商大数据的采集||电商大数据关键技术【基于Python】

.电商大数据采集API 什么是大数据&#xff1f; 1.大数据的概念 大数据即字面意思&#xff0c;大量数据。那么这个数据量大到多少才算大数据喃&#xff1f;通常&#xff0c;当数据量达到TB乃至PB级别时&#xff0c;传统的关系型数据库在处理能力、存储效率或查询性能上可能会遇…

API接口调用|京东API接口|淘宝API接口

什么是电商API接口&#xff1a; 电商API接口是电商服务平台对外提供的一种接口服务&#xff0c;允许第三方开发者通过编程方式与电商系统进行数据交互和功能调用。 这些接口提供了一种标准化的方法来获取、更新或处理电商平台上的商品信息、订单状态、用户数据、支付信息、物流…

基于Spring Boot的汉服文化网站设计与实现

基于Spring Boot的汉服文化网站设计与实现 开发语言&#xff1a;Java 框架&#xff1a;springboot JDK版本&#xff1a;JDK1.8 数据库工具&#xff1a;Navicat11 开发软件&#xff1a;eclipse/myeclipse/idea 系统部分展示 系统功能界面图&#xff0c;在系统首页可以查看首页…

nvcc: command not found

nvcc: command not found nvcc命令是 NVIDIA CUDA 编译器&#xff0c;就类似于gcc是c语言的编译器&#xff0c;用于编译 CUDA 代码并生成 GPU 可执行文件。由于程序是要经过编译器编程成可执行的二进制文件&#xff0c;而cuda程序有两种代码&#xff0c;一种是运行在CPU上的ho…

【Windows】Windows电脑如何给hosts添加解析

前言 hosts是一个没有扩展名的系统文件&#xff0c;在浏览器输入一个我们想要登录的网站时&#xff0c;电脑系统会自动帮助我们在hosts文件中寻找对应的IP地址&#xff0c;然后打开网站&#xff0c;如果找不到对应IP&#xff0c;电脑就需要对域名和IP地址进行解析。当对一个网…

【吃透Java手写】Spring(下)-AOP-事务及传播原理

【吃透Java手写】Spring&#xff08;下&#xff09;AOP-事务及传播原理 6 AOP模拟实现6.1 AOP工作流程6.2 定义dao接口与实现类6.3 初始化后逻辑6.4 原生Spring的方法6.4.1 实现类6.4.2 定义通知类&#xff0c;定义切入点表达式、配置切面6.4.3 在配置类中进行Spring注解包扫描…

物联网技术、测试要点和测试标准

物联网定义 物联网&#xff1a;利用嵌入式电子设备、微芯片等连接车辆、家电、医疗设备&#xff0c;以收集和交换不同类型的数据&#xff0c;被称为物联网。借助物联网&#xff0c;用户能够远程控制设备&#xff0c;可以实现不同设备的互联。在现实生活中物联网开始有越来越多…

【Unity】使用Resources.LoadAll读取文件的顺序问题

最近在做客户的一个项目&#xff0c;其中的一个模块使用到了照片&#xff0c;但是发现了一个很严重的问题。当你在使用Unity的时候&#xff0c;它竟然不按照顺序读取&#xff1f;这个机器人是不是逻辑有问题&#xff1f;如下图&#xff1a; 名字脱敏了哈。。。 照片比较多&…

京东工业优选商品详情API接口:解锁高效工业采购新体验

京东工业优选的商品详情API接口&#xff0c;允许开发者通过程序化的方式&#xff0c;快速获取平台上的商品详细信息。这些详细信息包括但不限于商品名称、价格、规格、库存、图片、评价等&#xff0c;为企业提供全方位的商品信息查询服务。 二、API接口的主要功能 实时查询&a…

茶多酚复合纳米纤维膜

茶多酚复合纳米纤维膜是一种结合了茶多酚与纳米纤维技术的创新材料。茶多酚作为茶叶中多酚类物质的总称&#xff0c;具有抗氧化、抗辐射、抗*等多种药理作用&#xff0c;是一种非常有益的天然物质。而纳米纤维膜则因其超细纤维结构、高比表面积和高孔隙率等特性&#xff0c;在过…

武汉星起航:自运营团队—亚马逊平台运营典范,优势凸显业绩斐然

武汉星起航电子商务有限公司&#xff0c;作为跨境电商领域的佼佼者&#xff0c;凭借自运营团队多年的深耕经验与对亚马逊市场规则的深刻理解&#xff0c;成功在亚马逊平台开设多家自营店铺&#xff0c;并取得显著成绩。公司月流水达到几百万的辉煌业绩&#xff0c;不仅彰显了其…

聚焦丨酷雷曼受邀参加武汉市供需对接“赶集会”

2024年4月26日&#xff0c;由中共武汉市委市直机关工委联合市委统战部、市工商联等有关部门共同举办的“先锋行•破难题”“进企业”暨中小微企业供需对接“赶集会”人工智能专场活动成功举行&#xff0c;此次活动是落实武汉统一战线服务科技型中小微企业创新发展的重要举措之一…

全国在线选座电影票小程序app开发需要具备哪些条件api是必须的吗?

全国在线选座电影票小程序或APP的开发需要具备一系列的条件&#xff0c;而API&#xff08;应用程序编程接口&#xff09;通常是其中必不可少的一部分。以下是一些关键的条件和API的作用&#xff1a; 关键条件&#xff1a; 明确需求和目标&#xff1a;首先&#xff0c;你需要明…

7天入门Android开发之第2天——四大组件之活动

一、活动是什么 活动&#xff08;Activity&#xff09;是 Android 应用程序中的一个重要组件&#xff0c;它代表用户界面上的单个窗口&#xff0c;通常会填充整个屏幕。通过活动&#xff0c;可以创建各种各样的用户界面&#xff0c;并控制界面的行为。活动可以包含各种 UI 元素…

上传文件至linux服务器失败

目录 前言异常排查使用df -h命令查看磁盘使用情况使用du -h --max-depth1命令查找占用空间最大的文件夹 原因解决补充&#xff1a;删除文件后&#xff0c;磁盘空间无法得到释放 前言 使用XFTP工具上传文件至CentOS服务器失败 异常 排查 使用df -h命令查看磁盘使用情况 发现磁盘…