大数据-55 Kafka sh脚本使用 与 JavaAPI使用 topics.sh producer.sh consumer.sh kafka-clients

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka (正在更新…)

章节内容

上节我们完成了:

  • Kafka介绍
  • ZK的基本环境
  • Kafka下载解压配置
  • Kafka启动配置
  • Kafka启动服务

在这里插入图片描述

Kafka启动

上节我们通过sh脚本启动,但是当我们的SSH关闭的时候,Kafka服务也退出。
这里我们可以使用 Kakfa 的守护进程的方式启动,就可以在后台运行了。

kafka-server-start.sh -daemon /opt/servers/kafka_2.12-2.7.2/config/server.properties

启动之后,我们可以通过 ps 工具看到:

ps aux | grep kafka

返回结果如下图:
在这里插入图片描述

sh脚本使用

topics.sh

kakfa-topics.sh 用于管理主题

查看所有

kafka-topics.sh --list --zookeeper h121.wzk.icu:2181

当前执行返回的是空的,因为我们没有任何主题。

创建主题

kafka-topics.sh --zookeeper h121.wzk.icu:2181 --create --topic wzk_topic_1 --partitions 1 --replication-factor 1

执行结果中,我们可以观察到,已经顺利的完成了。
在这里插入图片描述

查看主题

kafka-topics.sh --zookeeper h121.wzk.icu:2181 --describe --topic wzk_topic_1

执行结果中,我们可以观察到,已经顺利的完成了。
在这里插入图片描述

删除主题

kafka-topics.sh --zookeeper h121.wzk.icu:2181 --delete --topic wzk_topic_1

在这里插入图片描述

新建主题(用于测试)

kafka-topics.sh --zookeeper h121.wzk.icu:2181 --create --topic wzk_topic_test --partitions 1 --replication-factor 1

producer.sh

kafka-console-producer.sh 用于生产消息

生成数据

kafka-console-producer.sh --topic wzk_topic_test --broker-list h121.wzk.icu:9092

手动生成一批数据来进行测试:
在这里插入图片描述

consumer.sh

kafka-console-consumer.sh 用于消费消息

消费数据

kafka-console-consumer.sh --bootstrap-server h121.wzk.icu:9092 --topic wzk_topic_test

此时,我们需要再开启一个 Producer 产生数据,它才会继续消费。

从头消费

kafka-console-consumer.sh --bootstrap-server h121.wzk.icu:9092 --topic wzk_topic_test --from-beginning

从头开始消费的话,我们可以看到消费者已经把刚才我们写入的数据都消费了
在这里插入图片描述

Java API

架构图

在这里插入图片描述

POM

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.7.2</version>
</dependency>

生产者1测试


public class TestProducer01 {public static void main(String[] args) throws Exception {Map<String, Object> configs = new HashMap<>();configs.put("bootstrap.servers", "h121.wzk.icu:9092");configs.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");configs.put("acks", "1");KafkaProducer<Integer, String> producer = new KafkaProducer<>(configs);ProducerRecord<Integer, String> record = new ProducerRecord<>("wzk_topic_test",0, 0,"hello world by java!");Future<RecordMetadata> future = producer.send(record);future.get(3_000, TimeUnit.SECONDS);producer.close();}}

生产者1运行

  2024-07-12 11:53:11,542 INFO [org.apache.kafka.clients.producer.ProducerConfig] - ProducerConfig values: acks = 1batch.size = 16384bootstrap.servers = [h121.wzk.icu:9092]buffer.memory = 33554432client.dns.lookup = use_all_dns_ipsclient.id = producer-1compression.type = noneconnections.max.idle.ms = 540000delivery.timeout.ms = 120000enable.idempotence = falseinterceptor.classes = []internal.auto.downgrade.txn.commit = falsekey.serializer = class org.apache.kafka.common.serialization.IntegerSerializerlinger.ms = 0max.block.ms = 60000max.in.flight.requests.per.connection = 5max.request.size = 1048576

运行结果如下图:
在这里插入图片描述

生产者2测试

public class TestProducer02 {public static void main(String[] args) throws Exception {Map<String, Object> configs = new HashMap<>();configs.put("bootstrap.servers", "h121.wzk.icu:9092");configs.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");configs.put("acks", "1");KafkaProducer<Integer, String> producer = new KafkaProducer<>(configs);ProducerRecord<Integer, String> record = new ProducerRecord<>("wzk_topic_test",0, 0,"hello world by java! CallBack test!");producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e != null) {System.out.println("主题: " + recordMetadata.topic() + ", " +"分区: " + recordMetadata.partition() + ", " +"时间戳: " + recordMetadata.timestamp());} else {System.out.println("生产消息异常!!!");}}});producer.close();}}

运行之后,控制台输出:

2024-07-12 12:46:48,795 INFO [org.apache.kafka.common.utils.AppInfoParser] - Kafka version: 2.7.2
2024-07-12 12:46:48,795 INFO [org.apache.kafka.common.utils.AppInfoParser] - Kafka commitId: 37a1cc36bf4d76f3
2024-07-12 12:46:48,795 INFO [org.apache.kafka.common.utils.AppInfoParser] - Kafka startTimeMs: 1720759608792
2024-07-12 12:46:49,200 INFO [org.apache.kafka.clients.Metadata] - [Producer clientId=producer-1] Cluster ID: DGjwPmfLSk2OKosFFLZJpg
2024-07-12 12:46:49,209 INFO [org.apache.kafka.clients.producer.KafkaProducer] - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
主题: wzk_topic_test, 分区: 0, 时间戳: 1720759609201
2024-07-12 12:46:49,282 INFO [org.apache.kafka.common.metrics.Metrics] - Metrics scheduler closed
2024-07-12 12:46:49,282 INFO [org.apache.kafka.common.metrics.Metrics] - Closing reporter org.apache.kafka.common.metrics.JmxReporter
2024-07-12 12:46:49,282 INFO [org.apache.kafka.common.metrics.Metrics] - Metrics reporters closed
2024-07-12 12:46:49,283 INFO [org.apache.kafka.common.utils.AppInfoParser] - App info kafka.producer for producer-1 unregistered

运行的之后的控制台如下:
在这里插入图片描述

消费者01运行


public class TestConsumer01 {public static void main(String[] args) throws Exception {Map<String, Object> configs = new HashMap<>();configs.put("bootstrap.servers", "h121.wzk.icu:9092");configs.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");configs.put("group.id", "wzk-test");KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(configs);final List<String> topics = Arrays.asList("wzk_topic_test");consumer.subscribe(topics, new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> collection) {collection.forEach(item -> {System.out.println("剥夺的分区: " + item.partition());});}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> collection) {collection.forEach(item -> {System.out.println("接收的分区: " + item.partition());});}});final ConsumerRecords<Integer, String> records = consumer.poll(3_000);final Iterable<ConsumerRecord<Integer, String>> topic1Iterable = records.records("topic_1");topic1Iterable.forEach(record -> {System.out.println("消息头字段:" + Arrays.toString(record.headers().toArray()));System.out.println("消息的key:" + record.key());System.out.println("消息的偏移量:" + record.offset());System.out.println("消息的分区号:" + record.partition());System.out.println("消息的序列化key字节数:" + record.serializedKeySize());System.out.println("消息的序列化value字节数:" + record.serializedValueSize());System.out.println("消息的时间戳:" + record.timestamp());System.out.println("消息的时间戳类型:" + record.timestampType());System.out.println("消息的主题:" + record.topic());System.out.println("消息的值:" + record.value());});consumer.close();}}

消费者01测试

2024-07-12 13:00:17,456 INFO [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] - [Consumer clientId=consumer-wzk-test-1, groupId=wzk-test] Adding newly assigned partitions: wzk_topic_test-0
接收的分区: 0
2024-07-12 13:00:17,480 INFO [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] - [Consumer clientId=consumer-wzk-test-1, groupId=wzk-test] Setting offset for partition wzk_topic_test-0 to the committed offset FetchPosition{offset=12, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[h121.wzk.icu:9092 (id: 0 rack: null)], epoch=0}}
消息头字段:[]
消息的key:0
消息的偏移量:12
消息的分区号:0
消息的序列化key字节数:4
消息的序列化value字节数:20
消息的时间戳:1720760404260
消息的时间戳类型:CreateTime
消息的主题:wzk_topic_test
消息的值:hello world by java!

控制台运行截图如下:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3281037.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

如何有效管理众多账号密码:选择适合你的密码管理工具

在如今的数字化时代&#xff0c;我们的生活几乎离不开各种互联网应用和服务。从社交媒体到在线银行&#xff0c;从购物网站到工作平台&#xff0c;每个应用都要求我们注册账号并设置密码。 随着账号数量的不断增加&#xff0c;管理这些密码成为了一个令人头疼的问题。幸运的是…

【运维指南】常见的防火墙端口操作

每当一个应用程序想通过网络访问自己时&#xff0c;它就会申请一个 TCP/IP 端口&#xff0c;这意味着该端口不能被其他任何程序使用。那么&#xff0c;如何检查开放的端口&#xff0c;看看哪个应用程序已经在使用它呢&#xff1f; Windows 查看端口使用情况和进程名称 netst…

免费电子书网站

1.鸠摩搜书:https://www.jiumodiary.com/ 首页 白天模式 夜间模式(个人更喜欢白天的) 评分:☆☆☆☆☆ 1.网站简洁,刚开始只有一个搜索框。 2.内容多,而且有azw3、PDF、mobi、TXT、doc等6种格式提供下载 3.Last but not the least !!! 完全免费,不限制下载次数。…

QT+OpenGL绘制一个更加清晰的三维坐标系和图例

绘制图例 图例绘制有两种&#xff1a; 1. 设置多个颜色绘制 2.随机100个值&#xff08;自己可设置&#xff09;绘制 class CPointLegend : public CLegend { public:static CPointLegend& getInstance() {/*c11支持&#xff0c;线程安全的单例模式*/static CPointLegend …

Linux虚拟化技术KVM

文章目录 虚拟化基础什么是虚拟化虚拟化优势虚拟机虚拟机的主要特征Hypervisor类型类型1&#xff1a;裸金属型类型2&#xff1a;宿主型 KVM概述KVM体系结构KVM模块载入后的系统运行模式KVM集中管理和控制宿主机环境准备 安装KVM工具包libvirt包功能libvirt结构图安装KVM相关包C…

Linux AMBA 驱动:DMA 控制器 PL330 驱动简析

文章目录 1. 前言2. 背景3. PL330 简介4. PL330 驱动加载流程4.1 PL330 设备注册流程4.2 PL330 驱动加载流程 5. 小结6. 参考资料 1. 前言 限于作者能力水平&#xff0c;本文可能存在谬误&#xff0c;因此而给读者带来的损失&#xff0c;作者不做任何承诺。 2. 背景 本文基于…

鸿蒙对接极光推送时候报错1000900010,厂商token获取失败

在AppGallery Connect上配置项目的调试证书&#xff0c;然后手动导入&#xff0c;不要用IDE的自动构建证书&#xff1a; https://developer.huawei.com/consumer/cn/service/josp/agc/index.html#/

探索Python的加速神器:CyToolz,让数据处理快如闪电!

文章目录 探索Python的加速神器&#xff1a;CyToolz&#xff0c;让数据处理快如闪电&#xff01;背景&#xff1a;为何选择CyToolz&#xff1f;CyToolz是什么&#xff1f;如何安装CyToolz&#xff1f;五个简单函数的使用方法1. cytoolz.curry2. cytoolz.map3. cytoolz.reduce4.…

(十)联合概率数据互联原理及应用(JPDA)

目录 前言 一、JPDA原理及算法步骤 &#xff08;一&#xff09;算法步骤 1.确认矩阵计算 2.确认矩阵拆分 3.互联概率计算 4.状态及协方差更新 二、仿真验证 &#xff08;一&#xff09;模型构建 &#xff08;二&#xff09;仿真结果 总结 引用文献 前言 本文主要针…

微软:警惕利用VMware ESXi进行身份验证绕过攻击

微软于7月29日发布警告&#xff0c;称勒索软件团伙正在积极利用 VMware ESXi 身份验证绕过漏洞进行攻击。 该漏洞被追踪为 CVE-2024-37085&#xff0c;由微软安全研究人员 Edan Zwick、Danielle Kuznets Nohi 和 Meitar Pinto 发现&#xff0c;并在 6 月 25 日发布的 ESXi 8.0 …

吴恩达机器学习C1W2Lab05-使用Scikit-Learn进行线性回归

前言 有一个开源的、商业上可用的机器学习工具包&#xff0c;叫做scikit-learn。这个工具包包含了你将在本课程中使用的许多算法的实现。 目标 在本实验中&#xff0c;你将: 利用scikit-learn实现使用梯度下降的线性回归 工具 您将使用scikit-learn中的函数以及matplotli…

c#中使用数据验证器

前言 在很多情况下&#xff0c;用户的输入不一定满足我们的设计要求&#xff0c;需要验证输入是否正确&#xff0c;传统的方案是拿到控件数据进行逻辑判定验证后&#xff0c;给用户弹窗提示。这种方法有点职责延后的感觉&#xff0c;数据视图层应该很好的处理用户的输入。使用…

STM32DMA数据传输

我估计大多数人学这么久连听说都没听说过DMA&#xff0c;更不用提知道它是干嘛的。其实DMA的本质就是一个数据的搬运工。平常的时候当我们没有配置的时候&#xff0c;一直都是CPU在搬运数据&#xff0c;但是这个活又累又没有技术含量&#xff0c;所以DMA的重要性还是有的。 1.…

YOLOv9最新最全代码复现(论文复现)

YOLOv9最新最全代码复现&#xff08;论文复现&#xff09; 本文所涉及所有资源均在传知代码平台可获取 文章目录 YOLOv9最新最全代码复现&#xff08;论文复现&#xff09;引言YOLOv9模型概述模型框架图环境搭建及训练推理环境配置数据集准备训练过程测试和评估实践应用 报错修…

【机器学习西瓜书学习笔记——神经网络】

机器学习西瓜书学习笔记【第四章】 第五章 神经网络5.1神经元模型5.2 感知机与多层网络学习感知机学习率成本/损失函数梯度下降 5.3 BP神经网络&#xff08;误差逆传播&#xff09;5.4 全局最小与局部极小5.5 其他常见神经网络RBF网络RBF 与 BP 最重要的区别 ART网络 第五章 神…

Vue组件库移动端预览实现原理

引言 大家如果使用过移动端组件库&#xff08;比如&#xff1a;Vant&#xff09;&#xff0c;会发现在网站右侧有一个手机端的预览效果。 而且这个手机端预览的内容和外面的组件代码演示是同步的&#xff0c;切换组件的时候&#xff0c;移动端预览的内容也会发生相应的变化。 …

守护线程(Daemon Threads)详解:与非守护线程的区别

守护线程&#xff08;Daemon Threads&#xff09;详解&#xff1a;与非守护线程的区别 1、守护线程是什么&#xff1f;2、守护线程与非守护线程的区别2.1 JVM关闭行为2.2 任务性质2.3 线程设置2.4 示例代码 3、总结 &#x1f496;The Begin&#x1f496;点点关注&#xff0c;收…

pytorch 绘制Depth Anything网络结构

pytorch 绘制模型的网络结构有很多中方法&#xff0c;个人比较喜欢 torchview 生成的 Graphviz 风格的图片。 Graphviz介绍 Graphviz是一款开源的图形可视化软件&#xff0c;其名称来源于“Graph Visualization Software”的缩写。它通过使用一种名为DOT的描述语言来定义图形…

不踩雷的护眼大路灯有哪些?五款盲选不踩雷的护眼大路灯推荐

不踩雷的护眼大路灯有哪些&#xff1f;作为一名专业的实测博主温馨提示大家&#xff0c;虽然护眼落地灯是个好东西&#xff0c;它能够提供柔和舒适的环境光&#xff0c;减少对眼睛的伤害&#xff0c;但是千万别乱买跟风&#xff0c;盲目入手踩雷率80%以上。那么如何辨别一盏护眼…

创客项目秀 | 基于 XIAO 开发板的语音向导

背景 柴火创客空间作为大湾区科技创新的窗口&#xff0c;每年到访空间的社区伙伴众多&#xff0c;为了更好的进行空间信息交互&#xff0c;我们希望有一个装置是可以解决&#xff1a;当空间管理员不在现场的时候&#xff0c;到访者可以通过装置获得清晰的介绍与引导。 为了解…