第二篇 外部系统集成
Flume、Spark、Flink、SpringBoot 这些组件都可以作为kafka的生产者和消费者,在企业中非常常见。
Flume官网:Welcome to Apache Flume — Apache Flume
Flink:Apache Flink_百度百科
Spark:Apache Spark_百度百科
集成SpringBoot:略。
第三篇 生产调优手册
第1章 kafka硬件配置选择
第2章 生产者调优
2.1 生产者核心参数配置
发送流程:做到心中有图,回忆。
核心参数:
参数名称 | 描述 |
---|---|
bootstrap.servers | 生产者连接集群所需的 broker 地址清单。例如hadoop102:9092,hadoop103:9092,hadoop104:9092,可以设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者从给定的 broker 里查找到其他 broker 信息。 |
key.serializer 和 value.serializer | 指定发送消息的 key 和 value 的序列化类型。一定要写全类名。 |
buffer.memory | RecordAccumulator 缓冲区总大小,默认32m。 |
batch.size | 缓冲区一批数据最大值,默认16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。 |
linger.ms | 如果数据迟迟未达到 batch.size,sender 等待 linger.time之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间。 |
acks | 0:生产者发送过来的数据,不需要等数据落盘应答。 1:生产者发送过来的数据,Leader 收到数据后应答。 -1(all):生产者发送过来的数据,eader+和 isr 队列里面的所有节点收齐数据后应答。默认值是-1,-1 和 all是等价的。 |
max.in.flight.requests.per.connection | 允许最多没有返回 ack 的次数,默认为 5,开启幂等性要保证该值是 1-5 的数字。 |
retries | 当消息发送出现错误的时候,系统会重发消息。retries 表示重试次数。默认是 int 最大值,2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1,否则在重试此失败消息的时候,其他的消息可能发送成功了。 |
retry.backoff.ms | 两次重试之间的时间间隔,默认是 100ms。 |
enable.idempotence | 是否开启幂等性,默认 true,开启幂等性。 |
compression.type | 生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。支持压缩类型:none、gzip、snappy、lz4 和 zstd。 |
2.2 生产者如何提高吞吐量
2.3 数据可靠性
2.4 数据去重
略。
2.5 数据有序
单分区内,有序(有条件的,不能乱序);多分区,分区与分区间无序。
2.6 数据乱序
第3章 Kafka Broker调优
3.1 Broker核心参数配置
Kafka Broker总体工作流程:心中有图,回忆。
核心参数:
参数名称 | 描述 |
---|---|
replica.lag.time.max.ms | ISR 中,如果 Follower 长时间未向 Leader 发送通信请求或同步数据,则该 Follower 将被踢出 ISR。该时间阈值,默认 30s。 |
auto.leader.rebalance.enable | 默认是 true。 自动 Leader Partition 平衡。建议关闭。 |
leader.imbalance.per.broker.percentage | 默认是 10%。每个 broker 允许的不平衡的 leader的比率。如果每个 broker 超过了这个值,控制器会触发 leader 的平衡。 |
leader.imbalance.check.interval.seconds | 默认值 300 秒。检查 leader 负载是否平衡的间隔时间。 |
log.segment.bytes | Kafka 中 log 日志是分成一块块存储的,此配置是指 log 日志划分成块的大小,默认值 1G。 |
log.index.interval.bytes | 默认4kb,kafka 里面每当写入了 4kb 大小的日志(.log),然后就往 index 文件里面记录一个索引。 |
log.retention.hours | Kafka 中数据保存的时间,默认 7 天。生成中一般设置3天 |
log.retention.minutes | Kafka 中数据保存的时间,分钟级别,默认关闭。 |
log.retention.ms | Kafka 中数据保存的时间,毫秒级别,默认关闭。 |
log.retention.check.interval.ms | 检查数据是否保存超时的间隔,默认是 5 分钟。 |
log.retention.bytes | 默认等于-1,表示无穷大。超过设置的所有日志总大小,删除最早的 segment。 |
log.cleanup.policy | 默认是 delete,表示所有数据启用删除策略; 如果设置值为 compact,表示所有数据启用压缩策略。 |
num.io.threads | 默认是 8。负责写磁盘的线程数。整个参数值要占总核数的 50%。 |
num.replica.fetchers | 默认是 1。副本拉取线程数,这个参数占总核数的 50%的 1/3 |
num.network.threads | 默认是 3。数据传输线程数,这个参数占总核数的 50%的 2/3 。 |
log.flush.interval.messages | 强制页缓存刷写到磁盘的条数,默认是 long 的最大值,9223372036854775807。一般不建议修改,交给系统自己管理。 |
log.flush.interval.ms | 每隔多久,刷数据到磁盘,默认是 null。一般不建议修改,交给系统自己管理。 |
3.2 其他
1. 服役/退役新节点
(1)创建一个要均衡的主题。
(2)生成一个负载均衡的计划。
(3)创建副本存储计划(所有副本存储在 broker0、broker1、broker2、broker3 中)。
(4)执行副本存储计划。
(5)验证副本存储计划。
2. 增加分区
分区只能增加,不能减少。
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server
hadoop102:9092 --alter --topic first --partitions 3
3. 增加副本因子
4. 手动调整分区副本存储
5. Leader Partition负载均衡
6. 自动创建主题
如果 broker 端配置参数 auto.create.topics.enable 设置为 true(默认值是 true),那么当生
产者向一个未创建的主题发送消息时,会自动创建一个分区数为 num.partitions(默认值为
1)、副本因子为 default.replication.factor(默认值为 1)的主题。除此之外,当一个消费者
开始从未知主题中读取消息时,或者当任意一个客户端向未知主题发送元数据请求时,都会
自动创建一个相应主题。这种创建主题的方式是非预期的,增加了主题管理和维护的难度。
生产环境建议将该参数设置为 false。