kafka学习笔记(三、生产者Producer使用及配置参数)

在这里插入图片描述


1.简介

1.1.producer介绍

生产者就是负责向kafka发送消息的应用程序。消息在通过send()方法发往broker的过程中,有可能需要经过拦截器(Interceptor)序列化器(Serializer)分区器(Partitioner)的一系列作用后才能被真正的发往broker
demo:

public class KafkaClient {private static final String brokerList = "localhost:9092";private static final String tipic = "topic-test";public static Properties initConfig() {Properties props = new Properties();props.put("bootstrap.servers", brokerList);props.put("key.serializer", "org.apache.kafka.common.seralization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.seralization.StringSerializer");props.put("clinet.id", "producer.client.id.test");return props;} public static void main(String[] args) {Properties props = initConfig();KafkaProducer<String, String> producer = new KafkaProducer<>(props);ProducerRecord<String, String> record = new ProducerRecord<>(topic, "kafka producer test.");try {producer.send(record);} catch (Exception e) {e.printStackTrace();}}
}

ProducerRecord类:

public class ProducerRecord<K, V> {private final String topic; // 主题private final Integer partition; // 分区号private final Headers headers; // 消息头部private final K key; // 键private final V value; // 值private final Long timestamp; // 消息的时间戳
}
  • ProducerRecord类中的key属性:
    key用来指定消息的键,不仅是附加消息还可以用来计算分区号而可以让消息发往特定的分区。

    • 同一个key的消息会被划分到同一分区
    • 有key的消息还可以支持日志压缩的功能
  • 必要参数:

    • bootstrap.servers:客户端连接kafka集群所属broker地址(host1:port1),并非需要所有的broker地址,生产者会从给定的broker中查到其他broker信息,建议至少设置两个以上。
    • key.serializer和value.serializer:broker端接收的消息必须以字节数组(byte[])的形式存在。发往broker之前需要将消息中对应的key和value做相应的序列化操作来转换整字节数组。
  • KafkaProducer是线程安全的,可以在多个线程中共享单个KafkaProducer实例,也可以将KafkaProducer实例进行池化来供其他线程使用。
  • 消息发送有三种模式
    • 发后即忘(fire-and-forget):producer.send(record);
    • 同步(sync):producer.send(record).get();
    • 异步(async):producer.send(record, callback); // 回调函数的调用可以保证分区有序。

1.2.生产者拦截器

用来在消息发送前做一些准备工作,比如按照某个规则过滤、修改消息内容等,也可以用来在发送回调逻辑中做一些定制化的需求。

实现: 需自定义实现org.apache.kafka.clinets.producer.ProducerInterception接口。接口中有三个方法:

public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record); // 对消息进行相应的定制化处理// 在消息被应答或发送失败时调用,优先于Callback之前执行。此方法运行在Producer的IO线程中,所以
// 此方法中实现的代码逻辑越简单越好,否则会影响消息的发送速度。
public void onAcknowledgement(RecordMetadata metadata, Exception exception);public void close();

KafkaProducer可以指定多个拦截器形成拦截链。拦截链会按照interceptor.classes参数配置的拦截器顺序来执行。

1.3.序列化

生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给kafka。而在对侧,消费者需要反序列化器(Deserializer)把从kafka中收到的字节数组转换成对应的对象。

客户端自带的序列化器都实现了org.apache.kafka.common.serialization.Serializer接口,此接口有三个方法:

public void configure(Map<String, ?> configs, boolean isKey); // 配置当前类,主要确定编码类型
public byte[] serialize(String topic, T data); // 将类型T的数据转换为byte[]
public void close(); // 关闭当前的序列化器,一般情况下为空方法

生产者使用的序列化器和消费者使用的反序列化器是需要一一对应的。

1.4.分区器

为消息分配分区。如果消息ProducerRecord中没有指定partition字段,那么就需要依赖分区器,根据key这个字段来计算partition的值。

kafka提供的默认分区器是org.apache.kafka.clients.producer.internals.DefaultPartitioner,它实现了org.apache.kafka.clients.producer.Partitioner接口,此接口中定义了两个方法:

// 用来计算分区号
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
public void close(); // 关闭分区器的时候用来回收一些资源

Partitioner接口还有一个父接口org.apache.kafka.common.Configurable,此接口只有一个方法:

void configure(Map<String, ?> configs); // 获取配置信息机初始化数据

kafka中除了使用默认的分区器进行分区外还可以使用自定义的分区器,只要实现Partitioner即可。

2.整体架构

在这里插入图片描述

2.1.RecordAccumulator讲解

主要用来缓存消息以便Sender线程可以批量发送,进而减少网络传输的资源以便提升性能。(缓存大小通过生产者客户端参数buffer.memory配置,默认32MB)。

如果生产者发送消息的速度超过发送到服务器的速度,则会导致生产者空间不足,此时KafkaProducer的send()方法调用要么被堵塞要么抛出异常,这个取决于参数max.block.ms的配置,默认为60秒。

  • 主线程发送过来的消息都会被追加到RecordAccumulator的某个双端队列(Deque)中,RecordAccumulator内部为每个分区都维护了一个双端队列,队列的内容就是ProducerBatch,即Deque<ProducerBatch>

    注意: ProducerBatch中包含一个或多个ProducerRecord。ProducerRecord是生产者创建的消息,ProducerBatch是一个消息批次,ProducerRecord会被包含在ProducerBatch中,使字节的使用更加紧凑,也可以减少网络请求的次数以提高整体的吞吐量。

  • 消息写入缓存时,追加到Deque的尾部;Sender读取消息时,从Deque的头部读取。
  • Kafka生产者客户端通过java.io.ByteBuffer实现消息内存的创建和释放,不过频繁的创建和释放是比较耗费资源的,所以在RecordAccumulator内部存在一个BufferPool来实现ByteBuffer的复用,达到缓存高效利用的目的。

    BufferPool只针对特定大小(通过batch.size参数指定,默认16KB)的ByteBuffer进行管理,其他大小的ByteBuffer不会缓存到BufferPool中,我们可以适当的调大batch.size参数以便多缓存一些消息。

2.2.消息进入RecordAccumulator的逻辑

当一条ProducerRecord进入RecordAccumulator时:

  1. 先寻找与消息分区对应的Deque(没有则创建);
  2. 再从这个Deque尾部获取一个ProducerBatch(没有则创建);
  3. 查看ProducerBatch中是否可以写入这个ProducerRecord,可以则写入不可以则需新建ProducerBatch再写入;

    新建ProducerBatch时需评估这条消息是否超过batch.size参数的大小,如不超过则以batch.size参数大小来创建ProducerBatch(使用完这段内存区域后可以通过BufferPool的管理来复用),若超过则以评估的大小来创建(此段内存区域不会被复用)。

2.3.Sender步骤

Sender线程从RecordAccumulator中获取缓存的消息后:

  1. <分区, Deque<ProducerBatch>>转换为<Node, List<ProducerBatch>>,其中Node表示kafka集群的节点;

    对网络连接来说,生产者客户端是与具体的broker节点建立连接,不关心消息的分区;而KafkaProducer的应用逻辑则只关注往哪个分区发送那些消息,所以这里需要做一个应用逻辑层面到网络IO层面的转换

  2. 将转换的<Node, List<ProducerBatch>>进一步封装成<Node, Request>的形式,这样就可以将Request发往各个Node了。

    这里的Request指kafka的各种协议请求,对于消息的发送而言就是指具体的ProducerRequest。

  3. 请求在从Sender线程发往kafka之前还会以Map<NodeId, Deque<Request>>形式保存到InFlightRequest中,以缓存已经发出去但还没有收到相应的请求。

    通过配置参数可以限制每个连接(客户端与Node之间的连接)最多缓存的请求数。此配置参数为:max.in.flight.requests.per.connection,默认为5,即每个连接最多只能缓存5个未响应的请求,超过此值之后就不能再向这个连接发送更多的请求了,除非有缓存的请求收到了响应。

2.4.元数据

值kafka集群的元数据,这些数据包括集群中有主题信息、主题上的分区信息、分区的leader副本信息、follower副本信息、副本的AR和ISR集合信息、集群的节点信息以及控制器节点信息等。

更新元数据信息的条件:

  • 当客户端中没有需要使用的元数据信息,比如没有指定的Topic信息;
  • 超过metadata.max.age.ms时间没有更新元数据信息,此配置默认为5分钟;

更新元数据信息的步骤:

  1. 元数据更新操作时在是在客户端内部进行的,对外部不可见。
  2. 当需要更新元数据信息时,会先挑选出leastLoadedNode,然后向这个Node发送MetadataRequest请求来获取具体的元数据信息。
  3. 这个跟新操作由Sender线程发起,在创建完MetadataRequest之后同样会存入InFlightRequests,之后的步骤和发送消息类似。

元数据信息Sender线程负责更新,主线程需要读取,所以这里的数据同步通过synchronizedfinal关键字来保障。

Producer配置参数

参数默认值含义
bootstarp.servers“”指定连接kafka集群的broker地址(可以只有部分broker地址)
key.serializer“”消息中key对应的序列化类,需实现org.apahce.kafka.common.serialization.Serialiaer
value.serializer“”消息中value对应的序列化类,需实现org.apahce.kafka.common.serialization.Serialiaer
client.id“”指定kafkaProducer对应的客户端id(用来标记消息是从哪个客户端发来的)
acks“1”指定分区中必须有多少个副本收到此条消息生产者才会认为这条消息是成功写入的。它涉及消息的可靠性和吞吐量之间的权衡。asks=1:leader成功写入则返回成功;acks=0:发送消息后不需要等待服务器的响应;scks=-1/all:ISK集合中所有副本成功写入才能收到服务器的成功响应。
buffer.memory32MB生产者客户端中用于缓存消息的缓冲区大小。详见本章节#2.1
batch.size16KB用于指定producerBatch可以复用的内存区域的大小。
max.request.size1MB限制生产者客户端能发送消息的最大值,一般不建议盲目的增大,因为此参数与broker端的message.max.bytes参数有联动。
retries0生产者重试次数
retry.backoff.ms100设定两次重试之间的时间间隔
metadata.max.age30000ms如果在这个时间内无数据没有更新的话会被强制更新
compression.type“none”指定消息的压缩方式,默认情况下消息不会被压缩,,该参数还可以配置为"gzip"、"snappy"和"l24"。消息压缩可极大的减少网络传输量,减低网络的IO,提高整体的性能,是一种以时间换空间的优化方式。
connections.max.idle.ms540000ms指定多久之后关闭闲置的连接
linger.ms0指定生产者发送producerBatch之前等待更过的producerRecord加入ProducerBatch的时间。生产者客户端会在producerBatch被填满或等待时间超过linger.ms值时发送出去。
receive.buffer.bytes32KB设置socket接收消息缓冲区(SO_RECBUF)的大小。如果设置为-1,则使用操作系统的默认值;如果Producer与kafka处于不同的机房,则可以适当调大这个参数值。
send.buffer.bytes128KB设置socket发送消息缓冲区的大小…
request.timeout.ms30000ms配置Producer等待请求响应的最大时间。注意此参数需要比broker端参数replica.lag.time.max.ms的值大,这样可以减少因客户端重试而引起的消息重复的概率
max.block.ms60000用来控制kafkaProducersend()方法和partitionsFor()方法的阻塞时间。当生产者的送缓冲区已满或者没有可用的元数据时,这些方法就会阻塞。
partitioner.class~Defaultpartitioner用来指定分区器,需要实现org.apache.kafka.clients.producer.partitioner
enable.idempotencefalse是否开启幂等性功能。所谓幂等简单说就是对接口的多次调用所产生的结果和调用一次是一致的。生产者在进行重试的时候有可能会重复的写入消息,而使用kafka的幂等性功能之后就可以避免这种情况。
interceptor.classes“”用来设定生产者拦截器,需实现org.apache.kafka.clients.producer.ProducerInterceptor接口。
max.in.fligh.request5限制客户端与Node之间的连接最多缓存的请求数。
per.connection.transactional.idnull设置事务id,必须唯一

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3016445.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

【嵌入式必读】一文彻底理解PID自整定及PID自整定代码设计

文章目录 1. 前言2. PID简介3. 常用的PID自整定方法3.1 临界度比例法3.2 衰减曲线法 4. 继电反馈整定法原理4.1 继电反馈自整定的基本思想4.2 继电反馈自整定原理 5. 算法设计5.1 振荡的生成5.2 提取出临界周期 T c T_c Tc​和振荡波形幅值 A A A5.3 计算出PID参数 6 原代码6.1…

VmWare 虚拟机没有网络解决办法

由于最近需要&#xff0c;装了个VM虚拟机&#xff0c;但是突然发现本机有网络&#xff0c;虚拟机却没有网络&#xff0c;更换了虚拟机的网络设置&#xff0c;都尝试过了 都不管用&#xff0c; 最后尝试了这种方法完美解决 还原网络默认设置 首先还原虚拟网络编辑器设置 启动V…

小程序开通wx.getlocation接口原来还有这个方法

小程序地理位置接口有什么功能&#xff1f; 在平时我们在开发小程序时&#xff0c;难免会需要用到用户的地理位置信息的功能&#xff0c;小程序开发者开放平台新规要求如果没有申请开通微信小程序地理位置接口( getLocation )&#xff0c;但是在代码中却使用到了相关接口&#…

软件开发者如何保护自己的知识产权?

最近一个关于开源软件的知识产权纠纷的案例&#xff0c;非常有代表性&#xff0c; 其中涉及到的平台openwrt&#xff0c;一口君十几年前曾玩过&#xff0c; 通过这个案例&#xff0c;我们可以学习如何在今后工作中保护自己的知识产权&#xff0c; 以及如何合理直接或者间接利…

08 - 条件判断语句

---- 整理自狄泰软件唐佐林老师课程 文章目录 1. 条件判断语句2. 语法说明3. 经验4. 代码 1. 条件判断语句 makefile 中支持条件判断语句 可以根据条件的值来决定 make 的执行可以比较两个不同变量或者变量和常量的值 注&#xff1a;条件判断语句只能用于控制 make 实际执行的…

zabbix监控方式(zabbix-trapper)

中文&#xff1a;zabbix采集器&#xff0c;即zabbix sender 。 Zabbix-Trapper 监控方式可以一次批量发送数据给Zabbix Server&#xff0c;与主动模式不同&#xff0c;Zabbix-Trapper 可以让用户控制数据的发送&#xff0c;而不用Zabbix-Agent进程控制&#xff0c;这意味着可以…

品深茶的抗癌功能是否涉及虚假宣传?

品深茶说到底&#xff0c;本质还是中国传统茶叶&#xff0c;茶叶本就是一种含有多种成分的饮品&#xff0c;包括茶多酚、生物碱、氨基酸、有机酸等。这些成分对人体有一定的益处&#xff0c;如抗氧化、抗炎、抗菌等作用。 一些研究表明&#xff0c;茶叶中的某些成分如茶多酚、…

C语言----汉诺塔问题

1.什么是汉诺塔问题 简单来说&#xff0c;就是有三个柱子&#xff0c;分别为A柱&#xff0c;B柱&#xff0c;C柱。其中A柱从上往下存放着从小到大的圆盘&#xff0c;我们需要借助B柱和C柱&#xff0c;将A柱上的所有圆盘转移到C柱上&#xff0c;并且一次只能移动一个圆盘&#…

用户管理中心——数据库设计用户注册逻辑设计

用户管理中心——数据库设计&用户注册逻辑设计 规整项目目录1. 数据库自动生成器的使用实现基本的数据库操作&#xff08;操作user表&#xff09; 2. 注册逻辑的设计(1) 写注册逻辑(2) 实现(3) 测试代码 3. 遇到的问题 规整项目目录 utils–存放工具类&#xff0c;比如加密…

Jsoncpp介绍

1.简介 Jsoncpp 是一个 C 库&#xff0c;用于解析和生成 JSON 数据。它提供了一个易于使用的 DOM&#xff08;Document Object Model&#xff09;风格的 API&#xff0c;允许开发者以树形结构的方式操作 JSON 数据。 Jsoncpp 是一个C库&#xff0c;允许操作JSON值&#xff0c;…

【软测学习笔记】Python入门Day02

&#x1f31f;博主主页&#xff1a;我是一只海绵派大星 &#x1f4da;专栏分类&#xff1a;软件测试笔记 &#x1f4da;参考教程&#xff1a;黑马教程❤️感谢大家点赞&#x1f44d;收藏⭐评论✍️ python安装 1、进入Python的官方下载页面&#xff1a; Download Python | Py…

【论文阅读笔记】Mamba模型代码理解

0.开源代码地址 官方实现&#xff1a;state-spaces/mamba (github.com) 最简化实现&#xff1a;johnma2006/mamba-minimal: Simple, minimal implementation of the Mamba SSM in one file of PyTorch. (github.com) 直接实现&#xff1a;alxndrTL/mamba.py: A simple and e…

网络安全与IP地址的关联

网络安全与IP地址之间存在着密不可分的关系。IP地址作为网络通信的基础&#xff0c;对于网络安全的保障具有至关重要的作用。以下将详细探讨网络安全与IP地址之间的关联&#xff0c;以及IP地址在网络安全中的应用。 一、IP地址与网络安全的关系 IP地址是网络通信的基础&#x…

力扣437. 路径总和 III

Problem: 437. 路径总和 III 文章目录 题目描述思路复杂度Code 题目描述 思路 1.定义int类型函数rootSum(root, targetSum)&#xff0c;用于求取每一个节点等于目标函数的路径数&#xff1a; 1.1.易知rootSum(root, targetSum)求出的数量等于rootSum(root.left, targetSum - va…

C语言 main( ) 函数的指针数组形参是怎么回事?

一、问题 在使⽤⼀些开发⼯具⽣成C语⾔⽂件时&#xff0c;主函数 mian( ) 中会有参数&#xff0c;这个参数到底是怎么回事⼉呢&#xff1f; 二、解答 mian( ) 称为主函数&#xff0c;是所有程序运⾏的⼊口。 mian( ) 函数是由系统调⽤的&#xff0c;当处于操作命令状态下&…

【C 数据结构-动态内存管理】4. 无用单元收集(垃圾回收机制)

文章目录 【 1. 问题描述与解决方法 】【 2. 中断回收机制 】 【 1. 问题描述与解决方法 】 问题描述 动态存储管理的运行机制可以概括为&#xff1a;当用户发出申请空间的请求后&#xff0c;系统向用户分配内存&#xff1b;用户运行结束释放存储空间后&#xff0c;系统回收内…

深度学习之基于Tensorflow卷积神经网络智能体操健身系统

欢迎大家点赞、收藏、关注、评论啦 &#xff0c;由于篇幅有限&#xff0c;只展示了部分核心代码。 文章目录 一项目简介 二、功能三、系统四. 总结 一项目简介 一、项目背景 随着人们健康意识的提高和数字化技术的快速发展&#xff0c;智能健身系统逐渐成为健身领域的新趋势。…

MT8370_联发科MTK8370(Genio 510)芯片性能规格参数

MT8370芯片是一款利用超高效的6nm制程工艺打造的边缘AI平台&#xff0c;具有强大的性能和功能。这款芯片集成了六核CPU(2x2.2 GHz Arm Cortex-A78 & 4x2.0 GHz Arm Cortex-A55)、Arm Mali-G57 MC2 GPU、集成的APU(AI处理器)和DSP&#xff0c;以及一个HEVC编码加速引擎&…

C语言 动态内存管理

目录 1. C/C程序的内存分配2. 动态内存分配的作用3. malloc - 分配内存4. free - 释放内存5. calloc - 分配并清零内存6. realloc - 调整之前分配的内存块7. 常见的动态内存的错误7.1 对空指针解引用7.2 对动态开辟空间的越界访问7.3 对非动态开辟内存使用free7.4 使用free释放…

代码随想录算法训练营第十九天:二叉树go

代码随想录算法训练营第十九天&#xff1a;二叉树go 226.翻转二叉树 力扣题目链接(opens new window) 翻转一棵二叉树。 ​​ 这道题目背后有一个让程序员心酸的故事&#xff0c;听说 Homebrew的作者Max Howell&#xff0c;就是因为没在白板上写出翻转二叉树&#xff0c;最…