Kafka应用Demo:按主题订阅消费消息

安装环境

  Kafka安装可参考官方网站的指导(https://kafka.apache.org/quickstart), 按步骤解压压缩包,修改配置。然后再启动zookeeper和kafka-server即可。

  需要注意的一点:如果是在VMware虚拟机上启动的kafka, 需要修改一下server.properties配置文件,增加如下配置:
在这里插入图片描述
  advertised.listener指定访问kafka的IP和端口,IP设置为虚拟机暴露给外部访问的IP。通过本地代码连接kafka,需要使用该配置

生产者代码样例

public class KafkaProducerService {private static final String NEO_TOPIC = "elon-topic";private KafkaProducer<String, String> producer = null;public KafkaProducerService() {Properties props = new Properties();props.put("bootstrap.servers", "192.168.5.128:9092");props.put("acks", "0");props.put("group.id", "1111");props.put("retries", "2");//设置key和value序列化方式props.put("key.serializer", StringSerializer.class);props.put("value.serializer", StringSerializer.class);//生产者实例producer = new KafkaProducer<>(props);}/*** 外部调用的发消息接口*/public void sendMessage() {for (int i = 0; i < 10; ++i) {int p = i % 2;ProducerRecord<String, String> record = new ProducerRecord(NEO_TOPIC, p, "neo", JSON.toJSONString(i));producer.send(record);}}
}

 发送消息时,将10个数据分别发送到0分区和1分区。

消费者代码样例

public class KafkaConsumerService {private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerService.class);private static final String NEO_TOPIC = "elon-topic";Properties properties = new Properties();private KafkaConsumer consumer = null;public KafkaConsumerService() {properties.put("bootstrap.servers","192.168.5.128:9092");  // 指定 Brokerproperties.put("group.id", "neo1");              // 指定消费组群 IDproperties.put("max.poll.records", "5");properties.put("enable.auto.commit", "false");properties.put("key.deserializer", StringDeserializer.class); // 将 key 的字节数组转成 Java 对象properties.put("value.deserializer", StringDeserializer.class);  // 将 value 的字节数组转成 Java 对象consumer = new KafkaConsumer<String, String>(properties);consumer.subscribe(Collections.singletonList(NEO_TOPIC));  // 订阅主题 order-eventsnew Thread(this::receiveMessage).start();}public void receiveMessage() {try {while (true) {synchronized (this) {ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));LOGGER.info("Fetch record num:{}", records.count());for (ConsumerRecord<String,String> record: records) {String info = String.format("[Topic: %s][Partition:%d][Offset:%d][Key:%s][Message:%s]",record.topic(), record.partition(), record.offset(), record.key(), record.value());LOGGER.info("Received:" + info);Thread.sleep(100);}consumer.commitSync();}}} catch (Exception e){} finally {consumer.close();}}

 消费者按主题订阅。从打印的结果可以看到,消费者循环从topic下取出各个分区的消息依次消费。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3018554.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

cmake进阶:目录属性之 INCLUDE_DIRECTORIES说明二

一. 简介 前面几篇文章学习了 cmake的一些目录属性&#xff0c;主要有两个重要的目录属性INCLUDE_DIRECTORIES 属性、LINK_DIRECTORIES 属性。文章如下&#xff1a; cmake进阶&#xff1a;目录属性之 INCLUDE_DIRECTORIES-CSDN博客 本文学习 父目录的 INCLUDE_DIRECTORIES …

<网络安全>《79 概念讲解<第十二课 物联网常用协议-(远距离非蜂窝网络)-终端设备>》

协议简称全称名称内容说明ZigBee也称紫蜂低速短距离传输的无线通信协议一种高可靠的无线数传网络&#xff0c;主要特色有低速、低耗电、低成本、支持大量网上节点、支持多种网上拓扑、低复杂度、快速、可靠、安全。ZigBee技术是一种新型技术&#xff0c;主要是依靠无线网络进行…

JAVA IO/NIO 知识点总结

一、常见 IO 模型简介 1. 阻塞IO模型 最传统的一种IO模型&#xff0c;即在读写数据过程中会发生阻塞现象。当用户线程发出IO请求之后&#xff0c;内核会去查看数据是否就绪&#xff0c;如果没有就绪就会等待数据就绪&#xff0c;而用户线程就会处于阻塞状态&#xff0c;用户线…

Linux 命令查看服务器信息

1.查看 CPU 信息 lscpu2.查看内存信息 cat /proc/meminfo |grep MemTotal查看逻辑 CPU个数 cat /proc/cpuinfo | grep "processor"3.查看磁盘信息 df -hl

深入大模型量化技术,大模型端侧落地已Ready?

揭秘未来&#xff1a;大模型量化技术如何革新移动AI应用 ©作者|饮水机 来源|神州问学 前言 最近&#xff0c;苹果发布了OpenELM系列模型&#xff0c;参数规模分别为270M、450M、1.1B和3B。与此同时&#xff0c;微软也推出了Phi-3系列模型&#xff0c;其中mini版本的参数…

数字孪生技术在垃圾焚烧处理中的可视化应用

在迈向智慧城市的进程中&#xff0c;数字孪生技术在垃圾处理领域展现出了巨大潜力。特别是在垃圾焚烧过程的管理和优化上&#xff0c;数字孪生垃圾焚烧可视化技术已成为一项革命性的进步。 通过 HT 构建虚拟的垃圾焚烧模型&#xff0c;实时映射和模拟实际焚烧过程中的各项关键…

Android Studio查看xml文件的修改时间和记录

Android Studio查看xml文件的修改时间和记录 Android Studio里面如果是Java/Kotlin编写界面&#xff0c;可以点击函数开头上面的提交在直接&#xff0c;然后在编辑界面的左侧查看历史时间上的修改记录&#xff0c;但是xml文件里面没有直观的这样操作方式。 但xml里面可以通过快…

专业的保密网文件导入导出系统,让文件流转行为更可控安全

军工单位因其涉及国防安全和军事机密&#xff0c;对保密工作有极高的要求&#xff0c;通常会采取严格的网络隔离措施来保护敏感信息和提高网络安全性。常见的方式是通过物理隔离将网络彻底分隔开来&#xff0c;比如保密网和非保密网。网络隔离后&#xff0c;仍有数据交换的需求…

怎么在家访问公司内网?

在当前的疫情情况下&#xff0c;越来越多的公司开始允许员工在家办公&#xff0c;这就需要解决一个问题&#xff1a;如何在家访问公司的内网资源呢&#xff1f;今天我将介绍一种解决方案——使用【天联】组网&#xff0c;它具有许多优势。 【天联】组网的优势 无网络限制&#…

Pytorch 实现情感分析

情感分析 情感分析是 NLP 一种应用场景&#xff0c;模型判断输入语句是积极的还是消极的&#xff0c;实际应用适用于评论、客服等多场景。情感分析通过 transformer 架构中的 encoder 层再加上情感分类层进行实现。 安装依赖 需要安装 Poytorch NLP 相关依赖 pip install t…

迅为RK3568开发板资料说明4750+页专属文档专为3568编写

iTOP-3568开发板采用瑞芯微RK3568处理器&#xff0c;内部集成了四核64位Cortex-A55处理器。主频高达2.0Ghz&#xff0c;RK809动态调频。集成了双核心架构GPU&#xff0c;ARM G52 2EE、支持OpenGLES1.1/2.0/3.2、OpenCL2.0、Vulkan1.1、内嵌高性能2D加速硬件。 内置独立NPU,算力…

探索希尔排序算法:优雅而高效的增量排序

在计算机科学领域&#xff0c;排序算法是一项至关重要的技术&#xff0c;在各种应用场景中都扮演着重要角色。而希尔排序算法作为一种增量排序方法&#xff0c;在实际应用中展现了其优雅而高效的特性。本文将深入探讨希尔排序算法的原理、实现细节以及优化方法&#xff0c;带您…

YOLOv8改进 | 主干篇 | 2024.5全新的移动端网络MobileNetV4改进YOLOv8(含MobileNetV4全部版本改进)

一、本文介绍 本文给大家带来的改进机制是MobileNetV4&#xff0c;其发布时间是2024.5月。MobileNetV4是一种高度优化的神经网络架构&#xff0c;专为移动设备设计。它最新的改动总结主要有两点&#xff0c;采用了通用反向瓶颈&#xff08;UIB&#xff09;和针对移动加速器优化…

重庆市某区智慧园林绿化管理信息系统可行性研究及概算方案

第一章 项目需求分析 1 政务目标分析 智慧园林绿化管理信息系统建设将以管理精细化、服务智慧化、决策科学化、品质高端化为指引&#xff0c;以提升园林绿化能力为重点&#xff0c;以体制机制创新为保障&#xff0c;建成后实现园林绿化行业的全覆盖&#xff0c;建成汇聚园林绿…

jenkins+gitlab+ansible-tower实现发布

前提准备&#xff1a; gitlab中上传相应的jenkinsfile文件和源码。 安装和破解ansible-tower。 安装jenkins。 大致流程&#xff1a;从gitlab中拉取文件&#xff0c;存放到windows机器上&#xff0c;使用nuget等进行打包到windows中&#xff0c;使用sshPublisher语句传输到远程…

[Linux] git工具的安装和使用

目录 前言 安装 1.构建仓库 2.将仓库克隆到本地 使用 1.三板斧 1.git add 新增 2.git commit 提交 3.git push 推送 2.常用指令 前言 git 是一个代码托管平台&#xff0c;它的创始人是大名鼎鼎的 Linux 之父&#xff1a; 林纳斯托瓦兹&#xff0c;git的诞生可以使我们对…

Redis 实战之事务的实现

事务的实现 事务开始命令入队事务队列执行事务总结 一个事务从开始到结束通常会经历以下三个阶段&#xff1a; 1、 事务开始&#xff1b; 2、 命令入队&#xff1b; 3、事务执行。 本节接下来的内容将对这三个阶段进行介绍&#xff0c; 说明一个事务从开始到结束的整个过程。 …

AI大模型探索之路-训练篇17:大语言模型预训练-微调技术之QLoRA

系列篇章&#x1f4a5; AI大模型探索之路-训练篇1&#xff1a;大语言模型微调基础认知 AI大模型探索之路-训练篇2&#xff1a;大语言模型预训练基础认知 AI大模型探索之路-训练篇3&#xff1a;大语言模型全景解读 AI大模型探索之路-训练篇4&#xff1a;大语言模型训练数据集概…

图片8位, 16位,24位,32位原理,以及如何进行补位互转

写在前面&#xff1a;之前一直没有这个概念&#xff0c;以为像素就是十六进制如 #FFFFFF&#xff0c;或者rgb(255,255,255) 即可实现颜色定义&#xff0c;理解相当肤浅&#xff0c;接触到一个物联网项目&#xff0c;写底层的童鞋让我把16位如 0*FFFF转为24位去显示在浏览器&…

华为OD机试 - 找磨损度最高和最低的硬盘 - 优先队列(Java 2024 C卷 100分)

华为OD机试 2024C卷题库疯狂收录中&#xff0c;刷题点这里 专栏导读 本专栏收录于《华为OD机试&#xff08;JAVA&#xff09;真题&#xff08;A卷B卷C卷&#xff09;》。 刷的越多&#xff0c;抽中的概率越大&#xff0c;每一题都有详细的答题思路、详细的代码注释、样例测试…