springboot整合kafka入门

kafka基本概念

producer: 生产者,负责发布消息到kafka cluster(kafka集群)中。生产者可以是web前端产生的page view,或者是服务器日志,系统CPU、memory等。

consumer: 消费者,每个consumer属于一个特定的consuer group(可为每个consumer指定group name,若不指定group name则属于默认的group)。创建消费者时,要指定消费者接受的消息的topic,该消费者只会接受该topic的消息。

topic: 每条发布到Kafka集群的消息都有一个类别,这个类别被称为topic。(物理上不同topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或多个broker上但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处)。

broker: kafka集群包含一个或多个服务器,这些服务器就叫做broker。

本机安装kafka测试

安装kafka(mac下)

kafka下载: 从官网下载 kafka_2.13-2.7.0.tgz,直接解压即可。

本机测试kafka

1、进入到kafka的解压目录,输入命令启动zookeeper:

./bin/zookeeper-server-start.sh config/zookeeper.properties

复制

打开另一个终端输入命令启动kafka:

./bin/kafka-server-start.sh config/server.properties 

复制

2、服务启起来后,可以创建生产者和消费者了。 再打开另一个终端输入命令创建生产者:

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic

复制

broker-list: 参数指定生产者所使用的broker localhost: 9092 参数表示broker,这个broker为本机(127.0.0.1),且使用的端口是kafka的默认端口号是9092 topic: 参数表示生产者生产的消息的topic 为 “test_topic”

最后再打开另一个终端创建消费者:

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_topic --from-beginning

复制

bootstrap-server: 是指定consumer从哪里(broker)取出消息 topic: 指定消费者consumer取出的 topic 为“test_topic”的消息。 from-beginning: Kafka实际环境有可能会出现Consumer全部宕机,虽然基于Kafka的高可用特性,消费者群组中的消费者可以实现再均衡,所有Consumer不处理数据的情况很少,但是还是有可能会出现,此时就要求Consumer重启的时候能够读取在宕机期间Producer发送的数据。基于消费者订阅模式默认是无法实现的,因为只能订阅最新发送的数据。通过消费者命令行可以实现,只要在命令行中加上–from-beginning即可

3、都创建完了可以通过生产者输入消息,消费者来接收并显示消息,效果图如下:

springboot整合kafka(IDEA)

注意: kafka要是部署在服务器的话,本机就 要和服务器之间能ping通。

1、创建springboot项目:

2、创建两个类,分别为生产者和消费者 项目目录结构:

配置文件application.yml:(一般项目自动生成的是applicaiton.properties,但为了书写简便,改成yml)

spring:kafka:bootstrap-servers: 127.0.0.1:9092 #服务器的ip及端口,可以写多个,服务器之间用“:”间隔producer: #生产者配置key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer: #消费者配置group-id: test #设置消费者的组idenable-auto-commit: true
# auto-commit-interval: 1000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer

复制

springboot启动类入口,KafkaStudyApplication.java:

package com.study.kafka.kafka_study;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class KafkaStudyApplication { public static void main(String[] args) { SpringApplication.run(KafkaStudyApplication.class, args);}}

复制

TestKafkaProducerController.java:(生产者)

package com.study.kafka.kafka_study;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController     //定义这是一个控制器,可以通过浏览器访问
@RequestMapping("/kafka")
public class TestKafkaProducerController { @Autowired
private KafkaTemplate<String, String> kafkaTemplate;
//当在浏览器上输入http://localhost:8080/kafka/send?msg=abc,就会发送abc到服务器上去让消费者接收,msg对应下面的String msg
@RequestMapping("/producerSend")
public String send(String msg){ kafkaTemplate.send("test_topic", msg); //使用kafka模板发送信息
return "success";
}
}

复制

TestConsumer.java:(消费者)

package com.study.kafka.kafka_study;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class TestConsumer { /** * 定义此消费者接收topic为“test_topic”的消息,监听服务器上的kafka是否有相关的消息发过来 * @param record record变量代表消息本身,可以通过ConsumerRecord<?,?>类型的record变量来打印接收的消息的各种信息 * */
@KafkaListener(topics = "test_topic")
public void listen (ConsumerRecord<?, ?> record) throws Exception { System.out.printf("topic = %s, offset = %d, value = %s \n", record.topic(), record.offset(), record.value());
}
}

复制

测试

1、运行KafkaStudyApplication.java之后,终端上输入消息时,不仅终端上(服务器)运行的测试消费者能收到,IDEA上的程序也能收到。

2、在浏览器上输入http://localhost:8080/kafka/producerSend?msg=web world31231,不仅IDEA上的消费者能收到,在终端(服务器)上运行的测试消费者也能收到:(其中8080是tomcat服务器的端口,springboot默认下带的是tomcat)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/254456.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

Linux系统下imx6ull QT编程—— C++类和对象(三)

Linux QT编程 文章目录 Linux QT编程一、类和对象 一、类和对象 C 在 C 语言的基础上增加了面向对象编程&#xff0c;C 支持面向对象程序设计。类是 C 的核心特性&#xff0c;通常被称为用户定义的类型。类用于指定对象的形式&#xff0c;它包含了数据表示法和用于处理数据的方…

PHP + 小程序开发过程

前言 在此之前&#xff0c;我也曾经写过一篇用Go语言开发的过程总结&#xff01;当然这次也不例外&#xff0c;同样记录下过程&#xff0c;方便后续再次接触时&#xff0c;重新拾起会轻松点。 我特意地看了一下博客文章的记录&#xff0c;从项目的启动时间&#xff08;2017-1…

php开发微信小程序教程,从零开始开发微信小程序步骤(三)

前面我们和大家分享了如何创建一个新的页面和设置页面的标题,这一章我们来聊聊底部导航栏是如何实现的。即点击底部的导航,会实现不同对应页面之间的切换。我们先来看个我们要实现的底部导航栏的效果图:(三个导航图标示例,微信小程序最多能加5个)。 1. 图标准备 阿里图标库…

小程序中商家入驻提醒、新订单提醒

1. 应用场景 ThinkPHP技术QQ群: 828567087 用户在小程序商城购买下单之后&#xff0c;商家如何能及时收到新订单提醒&#xff0c;进行发货处理呢&#xff1f; 用户在小程序中申请入驻商家后&#xff0c;平台管理员如何能及时收到入驻申请通知&#xff0c;进行及时审核处理呢&…

微信小程序与PHP数据交互

微信js源码 Page({onLoad: function () {var that thiswx.request({//要交互页面的地址url: http://localhost/php/index.php/Wxwater/Test/test,data:{pid:1 //data里边使我们要传递给PHP的数据},method: GET,headers: {Content-Type: application/json},success(res) {cons…

PHP后台-微信小程序发送订阅消息(详细)

微信小程序订阅消息文档 步骤一&#xff1a;获取订阅消息模板 ID 在微信公众平台手动配置获取模板 ID&#xff1a; 登录 https://mp.weixin.qq.com 获取模板&#xff0c;如果没有合适的模板&#xff0c;可以申请添加新模板&#xff0c;审核通过后可使用。 在公告模板库找到要…

微擎小程序PHP,微擎配置小程序教程

微擎小程序通用配置图文教程&#xff0c;教会你怎么配置微擎小程序&#xff01; 1、去源码下载微擎小程序&#xff0c;这里以官网小程序为列&#xff0c;教大家如何配置微擎小程序。 2、将对应的官网小程序后端文件夹hc_step上传到微擎的addons目录下 3、在微擎安装好官网小程序…

php小程序二维码获取生成图片分享

第一步&#xff1a;获取小程序二维码 代码如下(采用tp5框架&#xff09;&#xff1a; namespace app\api\controller; use think\Controller; use think\Db; use think\Config; use app\api\controller\Common;class Code extends Common{/*** [getXcxCode 获取微信小程序二维…

微信小程序后台php实现数据get传递

1&#xff1a;利用phpstudy搭建本地服务器&#xff0c;端口设置为88 2&#xff1a;访问http://localhost:88/phpmyadmin/&#xff0c;在test数据库中新建一个表li&#xff0c;表中添加一个字段name&#xff0c;增加两行数据来进行测试 3&#xff1a;网站www根目录下新建一个wxa…

PHP 管理小程序审核发布

目录 前言一、申请小程序、开发平台账号二、使用步骤 1.获取token2.发布小程序总结 前言 随着微信文档功能越来越多 &#xff0c;大多数数据越来越容易便捷&#xff0c;本文就介绍小程序在后台的操作管理 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案例可供参考…

php实现微信小程序消息通知

接入消息通知指引地址&#xff1a;https://mp.weixin.qq.com/debug/wxadoc/dev/api/custommsg/callback_help.html 文档地址&#xff1a;https://mp.weixin.qq.com/debug/wxadoc/dev/api/notice.html#%E6%A8%A1%E7%89%88%E6%B6%88%E6%81%AF%E7%AE%A1%E7%90%86 看完这两个地址…

crmeb 多商户小程序配置

一、小程序下载并提交审核 下载微信小程序代码 位置&#xff1a;平台后台>应用>小程序 1 首先需要填写这里的小程序信息 然后在这里就可以下载&#xff0c;如果未开启直播要选择&#xff0c;否则会无法使用 2、下载编译后的小程序代码 没有配置小程序的需要先配置小程序…

微信小程序中国标准时间注册错误

在运行微信小程序的时候出现下图错误&#xff1a; 解决办法&#xff0c;在控制台当中输入openVendor() 会打开文件。如下图&#xff1a;删除圈中的文件&#xff0c;重启工具即可&#xff1a;

微信公众号支付报错:当前页面的url未注册

问题描述 公众号支付报错&#xff1a;“当前页面的URL未注册” 原因分析 用户实际的支付目录必须和在微信支付商户平台设置的一致&#xff0c;否则会报错"当前页面的URL未注册" 支付授权目录: 商户最后请求拉起微信支付收银台的页面地址 解决方案&#xff1a; 登…

基于原生微信小程序的时间组件

作者的絮叨 在开发原生微信小程序的时候&#xff0c;发现很少有基于原生微信小程序开发的相关插件或组件&#xff0c;不知道是不是我的孤陋寡闻&#xff0c;还是真的很少&#xff0c;暂且不管~下面我介绍一下我当时开发的一个时间组件&#xff0c;比较简陋&#xff0c;希望可以…

java后端微信小程序登录与注册

java后端微信小程序登录与注册&微信登录授权 分析: 微信小程序用户表 的字段来源于微信服务器 , 必须想办法去获取到对应的用户信息 找到微信开放平台: 微信开放平台 以下是微信开放平台给出的登录流程图: 微信给出的字段值: {"nickName": "Band",&…

微信OAuth2.0授权登录

微信OAuth2.0授权登录 OAuth2.0简介OAuth2的应用微服务安全社交登录 网页微信登录前期准备授权流程 服务器端开发需求网页显示二维码返回微信登录参数添加配置添加配置类controller 前端显示登录二维码封装api请求 处理微信回调添加httpclient工具类添加回调接口获取access_tok…

springboot微信登陆

微信登录的优势 目前微信用户数量巨大&#xff0c;用户更希望通过更快更便捷的方式进行登录&#xff0c;而不是传统的账号密码登录。 springboot 接入微信登陆 准备工作 网站应用微信登录是基于OAuth2.0协议标准构建的微信OAuth2.0授权登录系统。 在进行微信OAuth2.0授权登…

微信小程序注册/登录接口开发

文章目录 后端有关说明前端有关说明接口设计小程序注册/登录接口APP 注册/登录接口PC Web 端的注册/登录接口 小程序注册/登录序列图校验 token 后端有关说明 登录和注册的逻辑要独立抽取写成2个接口&#xff1a;注册接口、登录接口 小程序、APP、PC端的登录接口和注册接口要分…

服务器 微信报警平台,Zabbix实现微信报警

zabbix(音同 zbix)是一个基于WEB界面的提供分布式系统监视以及网络监视功能的企业级的开源解决方案。 zabbix能监视各种网络参数&#xff0c;保证服务器系统的安全运营&#xff1b;并提供灵活的通知机制以让系统管理员快速定位/解决存在的各种问题。 下文讲述了如何通过微信实现…