Kafka的入门及简单使用

文章目录

  • 前言
  • 一、Kafka 的基本架构?
    • 1. Producer(生产者)
    • 2. Broker(代理/服务器)
    • 3. Consumer(消费者)
    • 4. Consumer Group(消费者组)
    • 5. Topic(主题)
    • 6. Partition(分区)
    • 7. Replication(复制)
    • 8. ZooKeeper
  • 二、代码测试
    • 1.引入依赖
    • 2.启动
    • 3.创建生产者
    • 4.创建消费者
    • 5.结果


前言

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。其持久化层本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”,这使它作为企业级基础设施来处理流式数据非常有价值。

Kafka 的核心特性可以总结为以下几个方面:

  1. 发布订阅模型

    • Kafka 使用类似于消息队列的发布订阅模型,但更侧重于消息持久化以及支持多消费者模型。
    • 生产者(Producer)将消息发送到主题(Topic),消费者(Consumer)则订阅这些主题来消费消息。
    • 消费者可以是多个消费者组成的消费者组(Consumer Group),这样可以实现消息的并行处理。
  2. 可扩展性

    • Kafka 能够水平扩展,通过增加更多的服务器节点可以提升系统的吞吐量。
    • Kafka 可以部署在分布式集群中,具有很强的容错能力。
  3. 持久性和可靠性

    • Kafka 将消息存储在磁盘上,并允许复制到多个服务器上以防止数据丢失。
    • Kafka 保证消息的顺序性,在一个分区内的消息会按照它们被发送的顺序存储和读取。
  4. 高性能

    • Kafka 设计为支持高吞吐量,即使在非常大的数据集上也能保持低延迟。
    • Kafka 利用零拷贝技术来提高性能,这意味着它可以在不复制数据的情况下直接从磁盘读取数据到网络栈。
  5. 存储

    • Kafka 的数据存储是基于日志文件的,这意味着它可以有效地存储大量数据。
    • Kafka 支持数据保留策略,可以根据时间和大小来决定何时删除旧数据。
  6. 流处理

    • Kafka Streams API 允许开发者创建复杂的流处理应用程序,如实时聚合、过滤和转换数据等操作。
    • Kafka 还与其他流处理框架(如 Apache Flink 和 Apache Spark Streaming)集成良好。

Apache Kafka 是一个非常强大的流处理平台,它被广泛应用于多种不同的场景中。以下是 Kafka 的一些典型应用场景:

1. 日志处理与分析

  • 日志收集:Kafka 可以收集来自不同服务的日志数据,如 Web 服务器、应用服务器、数据库服务器等。
  • 日志聚合:将收集的日志数据聚合起来,以便进一步分析。
  • 日志分析:通过集成工具(如 Apache Flink, Hadoop, Elasticsearch 等)进行实时或批量分析。

2. 消息队列

  • 异步通信:Kafka 可以作为消息中间件,实现服务间的异步通信,降低服务耦合度。
  • 消息缓冲:在消息生产者和消费者之间提供缓冲,帮助平衡负载。
  • 服务解耦:通过消息队列实现服务之间的解耦。

3. 实时数据流处理

  • 实时分析:处理实时数据流,执行复杂的事件处理、转换和分析操作。
  • 流式数据处理:构建实时数据处理流水线,例如实时计算、警报触发等。

4. 系统监控与报警

  • 指标收集:收集各种监控指标,并实时处理这些数据。
  • 异常检测:基于实时数据流检测异常行为,并及时发出警报。

5. 流量削峰

  • 负载均衡:通过设置消息队列的最大容量来控制客户端流量,避免后端服务过载。

6. 高可用性

  • 多副本冗余:Kafka 的多副本机制确保了数据的高可用性。
  • 容错性:即使部分节点发生故障,Kafka 仍然能够保证数据的可靠性和持续的服务。

7. 分布式任务调度

  • 任务管理:通过 Kafka 发布任务,多个消费者可以并发地处理这些任务。

8. 物联网 (IoT)

  • 传感器数据处理:处理来自 IoT 设备的大量数据流。

一、Kafka 的基本架构?

在这里插入图片描述
下面是 Kafka 架构的主要组成部分:

1. Producer(生产者)

  • 生产者是向 Kafka 发布消息的应用程序或服务。
  • 生产者可以选择将消息发送到特定的 topicpartition
  • 生产者还负责选择消息的 key,这会影响消息如何被分发到分区。

2. Broker(代理/服务器)

  • Kafka 集群由一个或多个服务器组成,这些服务器称为 Broker。
  • Broker 负责接收来自生产者的消息并将它们存储在磁盘上。
  • 每个 Broker 可以托管一个或多个 topic 的分区。
  • Broker 也负责将消息发送给消费者。

3. Consumer(消费者)

  • 消费者是从 Kafka 中读取消息的应用程序或服务。
  • 消费者可以订阅一个或多个 topic,并且只能从自己订阅的 topic 中读取数据。
  • 消费者可以从特定的时间点开始读取历史数据,也可以从最新的消息开始读取。

4. Consumer Group(消费者组)

  • 一个 consumer group 是一组具有相同 group id 的消费者。
  • 每个 topic 的分区只会被分配给同一个 consumer group 中的一个消费者。
  • 这种机制允许消息被并行处理,并且提供了容错性,因为如果一个消费者失败,它的分区会被重新分配给同一个组内的其他消费者。

5. Topic(主题)

  • Topic 是一种逻辑类别,用于组织和发布消息。
  • 一个 topic 可以包含多个 partition
  • 每个 topic 都有一个预定义的配置,比如分区数量、复制因子等。

6. Partition(分区)

  • Partitiontopic 的物理分割,每个 partition 是一个有序的不可变消息序列。
  • 分区可以分布在多个 Broker 上,这使得 Kafka 能够水平扩展。
  • 每个 partition 都有一个主 broker(leader)以及可能的一些副本 broker(follower)。

7. Replication(复制)

  • 为了确保高可用性和容错性,每个 partition 都可以有多个副本。
  • 除了一个主副本(leader)之外,还可以有从副本(follower),这些副本会在不同的 Broker 上存储同一份数据。
  • 如果主副本出现故障,其中一个从副本可以被提升为主副本。

8. ZooKeeper

  • Kafka 使用 ZooKeeper 来管理集群的元数据,包括:
    • broker 的注册和发现。
    • topic 的配置。
    • consumer group 的状态。
    • 分区的领导者选举。

二、代码测试

1.引入依赖

<!-- Spring Kafka Starter -->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

2.启动

在Linux下安装、配置、启动
在IDEA中配置

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

3.创建生产者

@RestController
public class ProducerController {private static final Logger log = LoggerFactory.getLogger(ProducerController.class);private static final String TOPIC = "admin-messages";@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@PostMapping("/sendMessage")public String sendMessage(@RequestBody FrontEvent frontEvent) {String json = JSON.toJSONString(frontEvent);kafkaTemplate.send(TOPIC, json);log.info("Send message=========={}",json);return "send success!";}
}

4.创建消费者

@Component
public class Consumer{private static final Logger log = LoggerFactory.getLogger(Consumer.class);@KafkaListener(topics = "admin-messages",groupId = "myGroupId")public void receiveAdminMessage(String message) {try {FrontEvent frontEvent = JSON.parseObject(message, FrontEvent.class);log.info("Received message=========={}", frontEvent.toString());// 保存到数据库//frontEventRepository.save(frontEvent);} catch (Exception e) {// 如果发生异常,则需要进行失败重试,需手动设置重试次数,死信队列 // 手动ack ack.acknowledge();log.error("Failed to process the message: {}", message, e);}}
}

5.结果

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3280700.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

openJdk21镜像打包制作

文章目录 一.目的&#xff1a;二.dockerfile 代码三.调试四.效果图五.声明 一.目的&#xff1a; 制作基于openjdk21的基础镜像&#xff0c;方便后续使用 内容&#xff1a; 1.使用的是Debian 11 slim 作为基础镜像&#xff08;在此感谢no name大佬提醒我alpine做为基础镜像不稳…

TypeScript 与 JavaScript 的对比区别

还是大剑师兰特&#xff1a;曾是美国某知名大学计算机专业研究生&#xff0c;现为航空航海领域高级前端工程师&#xff1b;CSDN知名博主&#xff0c;GIS领域优质创作者&#xff0c;深耕openlayers、leaflet、mapbox、cesium&#xff0c;canvas&#xff0c;webgl&#xff0c;ech…

保研408真题练习:2009年全国硕士研究生入学统一考试(单选篇1)

&#x1f9ca;&#x1f9ca;&#x1f9ca;单项选择题&#xff08;共40道&#xff09; &#x1f9ca;数据结构&#xff08;10道&#xff09; &#x1f965;1.打印机的缓冲区逻辑结构 栈&#xff1a;先进后出&#xff1b; 队列&#xff1a;先进先出。 缓冲区的作用是解决主机…

EasyExcel 初使用—— Java 实现多种写入 Excel 功能

前言 大家好&#xff0c;我是雪荷。之前有一篇博客&#xff08;EasyExcel 初使用—— Java 实现读取 Excel 功能_java easyexcel.read-CSDN博客&#xff09;介绍了 Java 如何读取 Excel 表格&#xff0c;那么此篇博客就和大家介绍下 Java 如何利用 EasyExcel 写入 Excel。 Ea…

基于PHP+MySQL组合开发的微信活动投票小程序源码系统 带完整的安装代码包以及搭建部署教程

系统概述 在当今数字化时代&#xff0c;微信作为社交媒体的巨头&#xff0c;为企业和个人提供了丰富的互动营销平台。其中&#xff0c;投票活动作为一种有效的用户参与和互动方式&#xff0c;被广泛应用于各种场景。为了满足这一需求&#xff0c;我们推出了一款基于PHPMySQL组…

【原创教程】电气电工主要做什么?(入门篇)

本系列文章主要介绍工业电气电工所涉及到的操作技能&#xff0c;器件原理&#xff0c;图纸识别&#xff0c;以及电气电工质量管理。掌握本系列的技能&#xff0c;将能够胜任电气自动化方面的电气电工工作。 电气电工&#xff0c;它是一个细分领域&#xff0c;是目前许多公司迫…

【Python系列】Python 中`eval()`函数的正确使用及其风险分析

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

2024服贸会媒体邀约资源表-附媒体名单

传媒如春雨&#xff0c;润物细无声&#xff0c;大家好&#xff0c;我是51媒体网胡老师。 2024年服贸会作为全球服务贸易领域的重要盛会&#xff0c;将再次聚焦全球目光。计划于2024年举行的服贸会将继续发挥其作为国际服务贸易桥梁的作用&#xff0c;不仅是展示中国服务贸易成…

激光传感器 - 从零开始认识各种传感器【第二十一期】

激光传感器|从零开始认识各种传感器 1、什么是激光传感器 激光传感器是一种利用激光技术来进行测量和检测的设备。这类传感器使用激光光束来探测目标物体的位置、距离、速度或其他特性。激光传感器具有精度高、测量距离长&#xff0c;抗干扰能力强的特点。 2、激光传感器是如何…

Linux下杀死进程和线程的方法

文章目录 1. 背景介绍2. 命令介绍2.1 kill 命令2.2 pkill 命令2.3 top 或 htop 命令2.4 ps 命令 3. 使用方法3.1 杀死指定PID的进程3.2 杀死指定用户的所有进程3.3 杀死所有特定名称的进程3.4 使用 top 或 htop 杀死进程 4. 代码示例5. 总结 1. 背景介绍 在Linux操作系统中&am…

电脑格式化了还能恢复数据吗?

在日常使用电脑的过程中&#xff0c;我们可能会因为各种原因需要格式化硬盘。然而&#xff0c;格式化操作会清除硬盘上的所有数据&#xff0c;很多人担心格式化后数据无法找回。本文将详细介绍电脑格式化后的数据恢复方法&#xff0c;帮助大家在不小心格式化硬盘后&#xff0c;…

安卓赤拳配音v1.0.3Ai配音神器+百位主播音色

Ai配音神器 本人自用版本&#xff01;超级稳定&#xff01;百位主播音色 登陆即可用 链接: https://pan.baidu.com/s/1NqSx32vB-xvij-1T_qYqxQ?pwdnb3b 提取码: nb3b

golang JSON序列化

JSON JSON(JavaScript Object Notation) 是一种轻量级的数据交换格式。 易于人阅读和编写。同时也易于机器解析和生成。 它基于JavaScript Programming Language, Standard ECMA-262 3rd Edition - December 1999的一个子集。 json历史 [外链图片转存失败,源站可能有防盗链机…

Nginx代理路径被吃

Nginx代理路径被吃的情况 日常工作中经常使用nginx反向代理一些资源&#xff0c;有时正常代理&#xff0c;发现代理不过去。 验证被吃调location情况 通过浏览器访问&#xff1a; https://zhao138969.com/LinuxPackage/Python/SelectDocker location /LinuxPackage { proxy…

⌈ 传知代码 ⌋ 利用scrapy框架练习爬虫

&#x1f49b;前情提要&#x1f49b; 本文是传知代码平台中的相关前沿知识与技术的分享~ 接下来我们即将进入一个全新的空间&#xff0c;对技术有一个全新的视角~ 本文所涉及所有资源均在传知代码平台可获取 以下的内容一定会让你对AI 赋能时代有一个颠覆性的认识哦&#x…

花10分钟写个漂亮的后端API接口模板!

你好&#xff0c;我是田哥 在这微服务架构盛行的黄金时段&#xff0c;加上越来越多的前后端分离&#xff0c;导致后端API接口规范变得越来越重要了。 比如&#xff1a;统一返回参数形式、统一返回码、统一异常处理、集成swagger等。 目的主要是规范后端项目代码&#xff0c;以及…

苹果FaceTime诈骗泛滥,罪魁祸首是过时的隐私机制

在科技水平飞速发展的当下&#xff0c;手机、手表、电视等消费电子产品朝着智能化方向不断迭代。一方面&#xff0c;它们给我们的生活带来了便利。另一方面&#xff0c;这些电子产品经常被部分“有心人”利用&#xff0c;成为高科技电信诈骗的重要渠道之一。为了从你的手上骗取…

Android使用Fiddler模拟弱网络环境测试

之前安卓设置代理的步骤不再赘述 打开fiddler&#xff0c;默认情况下Rules –> Performances –> Simulate Modem Speeds 是未勾选状态&#xff0c;网络正常。当选中此选项&#xff08;模拟光猫网速&#xff09;后&#xff0c;网速就会变很慢&#xff0c;打开一个网页要加…

公布一批神马爬虫IP地址,真实采集数据

一、数据来源&#xff1a; 1、这批神马爬虫IP来源于尚贤达猎头公司网站采集数据&#xff1b; 2、数据采集时间段&#xff1a;2023年10月-2024年1月&#xff1b; 3、判断标准&#xff1a;主要根据用户代理是否包含“YisouSpider”&#xff0c;具体IP没做核实。 二、神马爬虫主…

DataKit之OpenGauss数据迁移工具

#1 关闭防火墙 systemctl stop firewalld systemctl disable firewalld systemctl status firewalld#2 当前JDK版本 wget https://download.java.net/java/GA/jdk11/9/GPL/openjdk-11.0.2_linux-x64_bin.tar.gzvim /etc/profile export JAVA_HOME/usr/local/jdk-11.0.2 export …