Kafka Producer发送消息流程之消息异步发送和同步发送

文章目录

  • 1. 异步发送
  • 2. 同步发送

在这里插入图片描述

1. 异步发送

Kafka默认就是异步发送,在Main线程中的多条消息,没有严格的先后顺序,Sender发送后就继续下一条,异步接受结果。

public class KafkaProducerCallbackTest {public static void main(String[] args) throws InterruptedException {//创建producerHashMap<String, Object> config = new HashMap<>();config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);for (int i = 0; i < 10; i++) {//创建recordProducerRecord<String, String> record = new ProducerRecord<String, String>("test2",""+i,"我是你爹"+i);//发送recordproducer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {System.out.println("回调信息:消息发送成功");}});System.out.println("发送数据");}//关闭producerproducer.close();}
}

Main线程中,对于多条数据,下一条消息的发送并不等待上一条消息的确认,而是继续发送。

2024-07-17 21:43:46.052 [kafka-producer-network-thread | producer-1] INFO  org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: BqIgDGtwTeeusL_ygHtn2w
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
2024-07-17 21:43:46.075 [main] INFO  org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2024-07-17 21:43:46.280 [kafka-producer-network-thread | producer-1] INFO  o.a.k.c.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 6000 with epoch 0
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
2024-07-17 21:43:46.569 [main] INFO  org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed

可以看到先是main线程循环发送完了多条数据,然后再异步收到通知。

2. 同步发送

消息有严格的先后顺序,下一条消息必须等到上一条消息的回调确认后,再发送,这是一个效率极低的过程。

按照流程图,上一条消息需要从生产者一直流转,多个步骤,到数据收集器,到Sender,最后还要等待回调确认,才可以开始下一条消息的流转。

public class KafkaProducerCallbackTest {public static void main(String[] args) throws InterruptedException, ExecutionException {//创建producerHashMap<String, Object> config = new HashMap<>();config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);for (int i = 0; i < 10; i++) {//创建recordProducerRecord<String, String> record = new ProducerRecord<String, String>("test2",""+i,"我是你爹"+i);//发送recordFuture<RecordMetadata> send = producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {System.out.println("回调信息:消息发送成功");}});System.out.println("发送数据");send.get();}//关闭producerproducer.close();}
}
2024-07-17 21:49:19.586 [kafka-producer-network-thread | producer-1] INFO  o.a.k.c.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 5000 with epoch 0
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
2024-07-17 21:49:19.823 [main] INFO  org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2024-07-17 21:49:19.838 [main] INFO  org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3247688.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

linux搭建mysql主从复制(一主一从)

目录 0、环境部署 1、主服务器配置 1.1 修改mysql配置文件 1.2 重启mysql 1.3 为从服务器授权 1.4 查看二进制日志坐标 2、从服务器配置 2.1 修改mysql配置文件 2.2 重启mysql 2.3 配置主从同步 2.4 开启主从复制 3、验证主从复制 3.1 主服务器上创建test…

单片机开发中,如何在断电前保存数据到dataflash?

在单片机开发中&#xff0c;保存数据到 DataFlash&#xff08;数据闪存&#xff09;是一项常见任务&#xff0c;尤其是在断电前需要保留重要数据时。 我收集归类了一份嵌入式学习包&#xff0c;对于新手而言简直不要太棒&#xff0c;里面包括了新手各个时期的学习方向编程教学…

多线程实现方式和常用方法

1 进程和线程 进程Process&#xff1a; 每个进程都有独立的代码和数据空间&#xff08;进程上下文&#xff09;&#xff0c;进程间的切换会有较大的开销&#xff0c;一个进程包含1--n个线程。可以把进程简单理解为操作系统中运行的一个程序。 线程Thread&#xff1a;同一类线程…

音视频开发入门教程(2)配置FFmpeg编译 ~共210节

在上一篇博客介绍了安装&#xff0c;音视频开发入门教程&#xff08;1&#xff09;如何安装FFmpeg&#xff1f;共210节-CSDN博客 感兴趣的小伙伴&#xff0c;可以继续跟着老铁&#xff0c;一起开始音视频剪辑功能&#xff0c;&#x1f604;首先查看一下自己的电脑是几核的&…

【代码规范】out = model(data)和out = model.forward(data.detach())的相似性和区别

【代码规范】out model(data)和out model.forward(data.detach())的相似性和区别 一、out model(data)和out model.forward(data.detach())的功能 二、out model(data)和out model.forward(data.detach())的区别 三、推理攻击下使用哪一个 文章目录 一、out model(data)…

Keka for Mac v1.4.3 中文下载 解压/压缩工具

Mac分享吧 文章目录 效果一、下载软件二、开始安装1、双击运行软件&#xff0c;将其从左侧拖入右侧文件夹中&#xff0c;等待安装完毕2、应用程序显示软件图标&#xff0c;表示安装成功 三、运行测试1、打开软件2、文件访问权限修改3、访达扩展 安装完成&#xff01;&#xff…

ppt文本框复制到word自动缩进的问题

ppt里的字是无缩进的&#xff1a; 复制粘贴到word中&#xff0c;突然出现2字符缩进&#xff1a; 微软官方嘴硬说没问题我也是无语&#xff01;&#xff01;word保留原格式复制后&#xff0c;出现莫名其妙的缩进 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直…

Kafka Producer发送消息流程之Sender发送线程和在途请求缓存区

文章目录 1. Sender发送数据1. 发送数据的详细过程&#xff1a;2. 关键参数配置 2. 在途请求缓存区 1. Sender发送数据 Sender线程负责将已经在RecordAccumulator中准备好的消息批次发送到Kafka集群。虽然消息在RecordAccumulator中是按照分区组织的&#xff0c;但Sender线程在…

强化学习的数学原理(2)

Value iteration & Policy itreation Value iteration algorithm 之前我们已经讲过怎么去求解贝尔曼最优公式&#xff0c;是利用contraction mapping theorem 来进行求解&#xff0c;我们知道这个contraction mapping theorem是一个迭代算法&#xff0c;实际上这个算法他有…

Android中OkHttp3中超时时间概述

目录 前言connectTimeoutreadTimeoutwriteTimeoutcallTimeoutpingInterval拓展 前言 可以看到&#xff0c;使用还是很简单的。主要相关的有这五个参数&#xff0c;其中我们常用到是就是connectTimeout、readTimeout和writeTimeout。 再看上图&#xff0c;可以看到默认下connec…

独立游戏《星尘异变》UE5 C++程序开发日志6——实现存档和基础设置系统

目录 一、存档类 1.创建一个SaveGame类 2.存储关卡内数据 3.加载关卡数据 4.关于定时器 5.存储全局数据 6.加载全局数据 二、存档栏 1.存档栏的数据结构 2.创建新存档 3.覆盖已有存档 4.删除存档 三、游戏的基础设置 1.存储游戏设置的数据结构 2.初始化设置 3.…

链表面试练习习题集(Java)

1. 思路&#xff1a; 因为杨辉三角是由二维数组构成&#xff0c;所以要先创建一个二维数组&#xff0c;如何用顺序表表示二维数组&#xff0c;可以通过List<List<Interger>>来表示一个二维数组&#xff0c;可以这样理解&#xff1a;先创建一个一维数组List&#x…

智慧消防建设方案(完整方案参考PPT)

智慧消防系统建设方案旨在通过物联网、大数据与云计算技术&#xff0c;集成火灾自动报警、智能监控、应急指挥等功能于一体。方案部署智能传感器监测火情&#xff0c;实时数据分析预警&#xff0c;实现火灾早发现、早处置。构建可视化指挥平台&#xff0c;优化应急预案&#xf…

Redis之List列表

目录 一.列表讲解 二.列表命令 三.内部编码 四.应用场景 Redis的学习专栏&#xff1a;http://t.csdnimg.cn/a8cvV 一.列表讲解 列表类型是用来存储多个有序的字符串&#xff0c;如下所示&#xff0c;a、b、c、d、e五个元素从左到右组成了一个有序的列表&#xff0c;列表中的…

android R ext4 image打包脚本介绍

一、Android R打包指令使用介绍 &#xff08;1&#xff09;mkuserimg_mke2fs #./mkuserimg_mke2fs --help usage: mkuserimg_mke2fs [-h] [--android_sparse] [--journal_size JOURNAL_SIZE][--timestamp TIMESTAMP] [--fs_config FS_CONFIG][--product_out PRODUCT_OUT][--b…

目标检测入门:4.目标检测中的一阶段模型和两阶段模型

在前面几章里&#xff0c;都只做了目标检测中的目标定位任务&#xff0c;并未做目标分类任务。目标检测作为计算机视觉领域的核心人物之一&#xff0c;旨在从图像中识别出所有感兴趣的目标&#xff0c;并确定它们的类别和位置。现在目标检测以一阶段模型和两阶段模型为代表的。…

常见漏洞之SSRF

一、SSRF简介 服务器端请求伪造&#xff08;SSRF&#xff09;是一种安全漏洞&#xff0c;允许攻击者通过构造恶意请求并利用存在缺陷的Web应用作为代理&#xff0c;向内外网发送请求&#xff0c;以实现攻击目的。SSRF攻击主要利用了服务端提供的某些功能&#xff0c;这些功能能…

基于jeecgboot-vue3的Flowable流程仿钉钉流程设计器-支持VForm3表单的选择与支持

因为这个项目license问题无法开源&#xff0c;更多技术支持与服务请加入我的知识星球。 1、初始化的时候加载表单 /** 查询表单列表 */ const getFormList () > {listForm().then(res > formOptions.value res.result.records) } 2、开始节点的修改&#xff0c;增加表…

【转盘案例-开始选号按钮-旋转 Objective-C语言】

一、接下来,我们来说这个“开始选号”按钮, 1.我们之前已经可以自旋转了,当我点击开始选号按钮之后,我让它快速的去旋转,5圈儿,然后停在最上方, 我先把ViewController的startRotate这句话啊,注释掉,先不让它自旋转呢, 把这句话注释掉, 接下来,我们command + R, …

Java---抽象类

乐观学习&#xff0c;乐观生活&#xff0c;才能不断前进啊&#xff01;&#xff01;&#xff01; 我的主页&#xff1a;optimistic_chen 我的专栏&#xff1a;c语言 &#xff0c;Java 欢迎大家访问~ 创作不易&#xff0c;大佬们点赞鼓励下吧~ 文章目录 抽象类什么的抽象类&…