Kafka消息队列python开发环境搭建

目录

引言

Kafka 的核心概念和组件

Kafka 的主要特性

使用场景

申请云服务器

安装docker及docker-compose

VSCODE配置

开发环境搭建

搭建Kafka的python编程环境

Kafka的python编程示例

引言

Apache Kafka 是一个分布式流处理平台,由 LinkedIn 开发并在 2011 年贡献给 Apache 软件基金会。虽然 Kafka 常被归类为消息队列(也称为消息传递系统或消息中间件),但它实际上提供了比传统消息队列更丰富的功能,特别是在处理大规模数据流方面。Kafka 最初被设计用于处理 LinkedIn 的高吞吐量日志数据,但现在已广泛应用于各种场景,包括网站活动跟踪、日志收集、实时分析、监控数据聚合以及流处理等。

Kafka 的核心概念和组件
  1. Broker(代理):Kafka 集群中的服务器被称为 broker。每个 broker 都可以独立地处理来自生产者的数据,并响应消费者的请求。

  2. Topic(主题):Kafka 中的消息被分类存储在名为 topic 的容器中。每个 topic 可以有多个分区(partition),每个分区都有序地存储消息。

  3. Partition(分区):分区是 Kafka 中实现水平扩展和容错的关键。每个分区可以分布在不同的 broker 上,同时每个分区内的消息都是有序的。

  4. Producer(生产者):生产者负责向 Kafka 集群发送消息到指定的 topic。生产者可以指定消息的键(key),Kafka 使用这个键来决定消息被发送到哪个分区。

  5. Consumer(消费者):消费者从 Kafka 集群订阅 topic 并消费数据。Kafka 支持多个消费者群组(consumer group)同时消费同一个 topic,每个消费者群组内的消费者可以共同分担处理数据的任务。

  6. Consumer Group(消费者群组):同一个消费者群组内的消费者可以并行地消费同一个 topic 的不同分区,但每个分区只能被一个消费者群组内的一个消费者消费,以确保消息的有序性。

  7. Offset(偏移量):Kafka 中的每条消息都有一个唯一的偏移量,用于标识消息在分区中的位置。消费者通过记录自己消费到的偏移量来跟踪消息的读取进度。

Kafka 的主要特性
  • 高吞吐量:Kafka 被设计用来处理高吞吐量的数据,可以轻松处理成千上万条消息/秒。
  • 可扩展性:Kafka 的集群可以通过增加更多的 broker 来水平扩展,以处理更大的数据量和更高的吞吐量。
  • 持久性:Kafka 通过将消息存储在磁盘上来保证消息的持久性,即使在服务器故障的情况下也不会丢失数据。
  • 容错性:Kafka 提供了强大的容错机制,包括自动的副本复制和数据冗余,以确保数据的可靠性和可用性。
  • 实时性:Kafka 支持实时数据处理,使得它可以用于构建实时流处理应用程序。
使用场景
  • 消息传递:作为传统的消息队列使用,支持解耦的生产者和消费者。
  • 网站活动跟踪:收集和分析用户的点击流、搜索查询等网站活动数据。
  • 日志收集:从分布式系统中收集日志数据,用于监控和分析。
  • 实时分析:对实时数据流进行实时处理和分析,以支持实时决策。
  • 事件流处理:处理实时事件流,如传感器数据、金融交易数据等。

申请云服务器

(以京东云为例,阿里云、腾讯云、华为云、天翼云类似)

注意在选择操作系统的时候选择ubuntu22.04或ubuntu20.04

管理员账户root

管理员密码:在安装的时候设置,记住密码

下载安装mobaXterm

https://mobaxterm.mobatek.net/download-home-edition.html

安装docker及docker-compose

#以下只安装一次即可!
sudo apt update
sudo apt install -y docker.io # intel x86_64
sudo curl -SL https://github.com/docker/compose/releases/download/v2.21.0/docker-compose-linux-x86_64 \-o /usr/local/bin/docker-compose
# 如果github不能访问,可用hub.njuu.cf或521github.com/镜像站替换github.com重试
sudo chmod +x /usr/local/bin/docker-compose #如报错,去掉sudo重试
sudo ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose #如报错,去掉sudo重试

查看docker和dockers是否安装好?

docker version
docker-compose version

VSCODE配置

开发环境搭建

  • 将如下文件保存为docker-compose.yml,并上传至服务器,例如/home/ubuntu/iiot/kafka

  • 将下面代码中的localhost替换为云服务公网IP

version: '3'
services:zookeeper:image: confluentinc/cp-zookeeper:latestenvironment:ZOOKEEPER_CLIENT_PORT: 2181ports:- "2181:2181"
dkafka:image: confluentinc/cp-kafka:latestcontainer_name: kafka-devdepends_on:- zookeeperenvironment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1ports:- "9092:9092"
  • 上传metaldigi.zip

解压缩文件

sudo apt updatesudo apt install zipunzip metaladigi.zip

cd metaldigi/kafka
  • 启动kafka消息服务器

    • 在命令行执行消息生产者

docker-compose up -d

搭建Kafka的python编程环境

  • 进入metaldigi文件夹

  • 执行 docker ps

cd metaldigi
docker-compose up -d

Kafka的python编程示例

  • 进入metaldigi文件夹

  • 执行 docker ps

 docker exec -it metal-digi-backend bash

消息生产者

from kafka import KafkaProducer# 创建一个 Kafka 生产者实例# 这里指定了 Kafka 服务器的地址和端口
producer = KafkaProducer(bootstrap_servers='150.158.11.142:9092')# 循环发送 10 条消息到 'demo-topic' 主题
for_in range(10):
# 将要发送的消息转换为字节格式message =f'message{_}'.encode('utf-8')# 发送消息到 Kafka 的 'demo-topic' 主题producer.send('demo-topic', value=message)# 打印已发送的消息print(f'Sent message: message{_}')# 关闭生产者实例
producer.close()这段代码创建一个 Kafka 生产者,用于向 Kafka 集群发送消息。它循环发送10条消息到名为 'demo-topic' 的主题。每条消息都是一个简单的文本字符串,转换为字节格式后发送。

消息消费者

from kafka import KafkaConsumer# 创建一个 Kafka 消费者实例
# 指定 Kafka 服务器地址、端口以及其他一些配置
consumer = KafkaConsumer('demo-topic',  # 指定要消费的主题bootstrap_servers='150.158.11.142:9092',  # Kafka 服务器地址和端口auto_offset_reset='earliest',  # 从最早的消息开始消费enable_auto_commit=True,  # 自动提交偏移量group_id='demo-group'  # 消费者组标识
)# 循环消费并打印收到的消息
for message in consumer:# 解码并打印消息内容print(f"Received message: {message.value.decode()}")

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3248165.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

【BUG】已解决:WslRegisterDistribution failed with error: 0x800701bc

已解决:WslRegisterDistribution failed with error: 0x800701bc 欢迎来到英杰社区https://bbs.csdn.net/topics/617804998 欢迎来到我的主页,我是博主英杰,211科班出身,就职于医疗科技公司,热衷分享知识,武…

Word文档恢复竟然这么简单?3个推荐方案送上!

“我很喜欢用Word进行文字创作,可是我有一次重新打开我的Word文档,却显示文档已丢失,这该怎么办呢?凝聚我多年心血的文章还有可能恢复吗?” 不论是总结学习内容还是汇报工作成果,我们总会用上Word。Word作…

level 6 day2 网络基础2

1.socket(三种套接字:认真看) 套接字就是在这个应用空间和内核空间的一个接口,如下图 原始套接字可以从应用层直接访问到网络层,跳过了传输层,比如在ubtan里面直接ping 一个ip地址,他没有经过TCP或者UDP的数…

大数据量接口响应慢-传输优化

问题 接口一次性返回大量数据,导致JSON数据大小过大,带宽大小不足,导致接口响应时间过长 解决方案 通过数据传输压缩来降低传输数据的大小,从而提高传输效率 服务器端压缩 springboot项目配置application文件,通过…

不懂U盘文件恢复?学会这4个方法点亮技能点!

“向广大网友求助:U盘里的文件意外删除了还有机会恢复吗?工作的时候不小心删除了存储在U盘里的重要文件,撤销也恢复不了,我还有其他的办法吗?” 相信大家在日常生活中,为了储存和随时携带重要的文件信息&a…

第5章 单片机的中断系统

5.1 中断的概念 5.2 中断控制系统 5.3 中断处理过程 5.4 中断的编程及应用举例 5.1 中断的概念 日常生活的中断现象举例 中断是指在突发事件到来时先中止当前正在进行的工作,转而去处理突发事件。待处理完成后,再返回到原先被中止的工作处&#xff…

状态管理的艺术:探索Flutter的Provider库

状态管理的艺术:探索Flutter的Provider库 前言 上一篇文章中,我们详细介绍了 Flutter 应用中的状态管理,以及 StatefulWidget 和 setState 的使用。 本篇我们继续介绍另一个实现状态管理的方式:Provider。 Provider优缺点 基…

【论文速读】| 涟漪下的漩涡:对启用RAG的应用程序的实证研究

本次分享论文:Vortex under Ripplet: An Empirical Study of RAG-enabled Applications 基本信息 原文作者:Yuchen Shao, Yuheng Huang, Jiawei Shen, Lei Ma, Ting Su, Chengcheng Wan 作者单位:East China Normal University, The Unive…

JVM基本知识——运行空间

JVM(Java Virtual Machine)即Java虚拟机,是负责读取java字节码,并在实际的硬件环境中运行。 JVM可以分为三部分:类装载器(ClassLoader)子系统、内存空间、执行引擎 内存空间(运行时…

高职院校人工智能人才培养成果导向系统构建、实施要点与评量方法

一、引言 近年来,人工智能技术在全球范围内迅速发展,对各行各业产生了深远的影响。高职院校作为培养高技能人才的重要基地,肩负着培养人工智能领域专业人才的重任。为了适应社会对人工智能人才的需求,高职院校需要构建一套科学、…

【STC89C51单片机】定时器/计数器的理解

目录 定时器/计数器1. 定时器怎么定时简单理解(加1经过了多少时间)什么是时钟周期什么是机器周期 2.如何设置定时基本结构相关寄存器1. TMOD寄存器2. TCON寄存器 代码示例 定时器/计数器 STC89C51单片机的定时器和计数器(Timers and Counter…

基于STM32老人摔倒报警设计

1.简介 随着我国老年人人口不断上升,我国已经进入人口老龄化,老龄人的人数加剧随着而来的就是基本的健康安全问题成为了如今社会主要解决的问题。随着已经步入信息时代,为了解决老年人的健康问题,相关技术的使用已经成为一个热门话…

JVM高频面试点

文章目录 JVM内存模型程序计数器Java虚拟机栈本地方法栈Java堆方法区运行时常量池 Java对象对象的创建如何为对象分配内存 对象的内存布局对象头实例数据对齐填充 对象的访问定位 垃圾收集器找到垃圾引用计数法可达性分析(根搜索法) 引用概念的扩充回收方…

COD论文学习 ZoomNext

现有方法的不足之处 高内在相似性:伪装物体与背景之间的高内在相似性使得检测变得困难,现有方法难以准确区分二者。多样化的规模和模糊的外观:伪装物体在规模和外观上多样化,且可能严重遮挡,导致现有方法难以处理。不…

【青书学堂】2024年第一学期 保险理论与实务(高起专) 作业

【青书学堂】2024年第一学期 保险理论与实务(高起专) 作业 为了方便日后复习,青书学堂成人大专试题整理。 若有未整理的课程,请私信我补充,欢迎爱学习的同学们收藏点赞关注!文章内容仅限学习使用!!&#xf…

「C++」类和对象(1)

欢迎来到海盗猫鸥的博客~~ 本篇我们将学习部分C中的类和对象相关知识沃~ (• ω •)ノ算我一个~ 目录 类的定义 类的定义及使用 访问限定符 类域 实例化 实例化概念: 对象大小: 内存对齐规则: 注意点&#xff…

十二.核心动画 - 动画缓冲①(框架提供的缓存函数)

引言 上一篇博客我们已经讨论了Core Animation中时间的处理,以及CAMediaTiming协议。本篇博客我们就来看一下另外一个和时间有关的机制--缓冲。 缓冲会让动画看起来更平滑更自然,我们在本篇博客将会体验一下CAAnimation提供的缓存函数,UIVi…

TCP vs UDP:网络编程的双刃剑艺术

在浩瀚的网络编程世界里,TCP(传输控制协议)与UDP(用户数据报协议)如同两位性格迥异的剑客,各自以其独特的剑法与风格,在网络通信的舞台上独领风骚。今天,就让我们深入探索这两位网络…

【DevOps系列】DevOps简介及基础环境安装

作者:后端小肥肠 目录 1. 前言 2. DevOps(详细介绍) 3. Code阶段工具 3.1 Git安装 3.2 GitLab安装 4. Build阶段工具 5. Operate阶段工具 5.1 Docker安装 5.2 Docker-Compose安装 6. Integrate工具 6.1 Jenkins介绍 6.2 Jenkins安…

Axure中继器入门:打造你的动态原型

前言 中继器 是 Axure 中的一个高级功能,它能够在静态页面上模拟后台数据交互的操作,如增加、删除、修改和查询数据,尽管它不具备真实数据存储能力。 中继器就像是一个临时的数据库,为我们在设计原型时提供动态数据管理的体验&a…