Python操作Kafka基础教程

01 Python操作Kafka基础教程

创建ZooKeeper容器

docker run -d --name zookeeper -p 2181:2181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper

创建Kafka容器

语法是:

docker run  -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=[你的IP地址]:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://[你的IP地址]:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka

我的虚拟机IP是192.168.31.86,所以我的命令是:

docker run  -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.31.86:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.31.86:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka

安装可视化工具

下载UI工具:https://kafkatool.com/download2/offsetexplorer_64bit.exe

下载好以后按照默认进行安装。

在这里插入图片描述

在这里插入图片描述

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

在这里插入图片描述

在这里插入图片描述

连接Kafka

搜索软件并打开:

在这里插入图片描述

在这里插入图片描述

配置zookeeper:

在这里插入图片描述

配置Kafka:

在这里插入图片描述

点击Test测试按钮,测试是否能够连接Kafka:

在这里插入图片描述

点击是,然后就成功的使用客户端连接上Kafka了。

在这里插入图片描述

安装依赖

安装Python3.8

安装:

pip install kafka-python==2.0.2

发布和消费json数据

生产者

from kafka import KafkaProducer
import json# 创建生产者
producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'),bootstrap_servers=['127.0.0.1:9092']
)# 要提交的消息
msg_dict = {"operatorId": "test",  # 公交公司ID"terminalId": "123",  # 设备Id"terminalCode": "123",  # 设备编码(使用车辆ID)"terminalNo": "1",  # 同一车辆内terminal序号从1开始
}# 向指定的主题发送消息
producer.send("text1", msg_dict)
producer.close()

消费者

from kafka import KafkaConsumer# 创建消费者
consumer = KafkaConsumer('text1', bootstrap_servers='127.0.0.1:9092')# 不停的消费数据
for msg in consumer:print(msg.value.decode())

发布和消费文本数据

生产者

from kafka import KafkaProducer# 创建生产者
producer = KafkaProducer(value_serializer=lambda v: v.encode('utf-8'),bootstrap_servers=['127.0.0.1:9092']
)# 向指定的主题发送消息
producer.send("text1", "你好")
producer.close()

消费者

from kafka import KafkaConsumer# 创建消费者
consumer = KafkaConsumer('text1', bootstrap_servers='127.0.0.1:9092')# 不停的消费数据
for msg in consumer:print(msg.value.decode())

发布和消费键值对文本数据

生产者

from kafka import KafkaProducer# 创建生产者
producer = KafkaProducer(key_serializer=lambda v: v.encode('utf-8'),value_serializer=lambda v: v.encode('utf-8'),bootstrap_servers=['127.0.0.1:9092']
)# 向指定的主题发送消息
producer.send("text1", key="msg", value="你好")
producer.close()

消费者

from kafka import KafkaConsumer# 创建消费者
consumer = KafkaConsumer('text1', bootstrap_servers='127.0.0.1:9092')# 不停的消费数据
for msg in consumer:print("key=", msg.key.decode())print("value=", msg.value.decode())

发布和消费键值对JSON数据

生产者

from kafka import KafkaProducer
import json# 创建生产者
producer = KafkaProducer(key_serializer=lambda v: json.dumps(v).encode('utf-8'),value_serializer=lambda v: json.dumps(v).encode('utf-8'),bootstrap_servers=['127.0.0.1:9092']
)# 向指定的主题发送消息
key = {"a": 1}
value = {"b": 2}
producer.send("text1", key=key, value=value)
producer.close()

消费者

from kafka import KafkaConsumer# 创建消费者
consumer = KafkaConsumer('text1', bootstrap_servers='127.0.0.1:9092')# 不停的消费数据
for msg in consumer:print("key=", msg.key.decode())print("value=", msg.value.decode())

发布和消费压缩文本数据

生产者

from kafka import KafkaProducer# 创建生产者
producer = KafkaProducer(value_serializer=lambda v: v.encode('utf-8'),bootstrap_servers=['127.0.0.1:9092'],compression_type='gzip',  # 通过此参数声明要压缩数据传输
)# 向指定的主题发送消息
producer.send("text1", "你好")
producer.close()

消费者

from kafka import KafkaConsumer# 创建消费者
consumer = KafkaConsumer('text1', bootstrap_servers='127.0.0.1:9092')# 不停的消费数据
for msg in consumer:print(msg.value.decode())

同时消费多个主题

生产者

from kafka import KafkaProducer# 创建生产者
producer = KafkaProducer(value_serializer=lambda v: v.encode('utf-8'),bootstrap_servers=['127.0.0.1:9092']
)# 向指定的主题发送消息
producer.send("text1", "你好")
producer.send("text2", "你好")producer.send("text1", "你好1")
producer.send("text2", "你好1")producer.send("text1", "你好2")
producer.send("text2", "你好2")producer.close()

消费者

from kafka import KafkaConsumer# 创建消费者
consumer = KafkaConsumer(bootstrap_servers='127.0.0.1:9092')# 不停的消费数据
consumer.subscribe(["text1", "text2"])
for msg in consumer:print(msg)print(msg.topic)print(msg.value.decode())

获取发布结果

生产者

from kafka import KafkaProducer# 创建生产者
producer = KafkaProducer(value_serializer=lambda v: v.encode('utf-8'),bootstrap_servers=['127.0.0.1:9092']
)# 向指定的主题发送消息
feature = producer.send("text1", "你好")# 会阻塞,直到发送成功
print(feature.get(timeout=60))producer.close()

消费者

from kafka import KafkaConsumer# 创建消费者
consumer = KafkaConsumer("text1", bootstrap_servers='127.0.0.1:9092')# 不停的消费数据
for msg in consumer:print(msg.topic)print(msg.value.decode())

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/2803919.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

观察者模式, 发布-订阅模式, 监听器模式

观察者模式, 发布-订阅模式, 监听器模式 观察者模式 观察者模式是一种行为型设计模式, 定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并被自动更新 角色模型和结构图 在观察者模式中,只有两种…

码农永远高薪吃香的3项特质

最近看到Google在裁员滚滚,再次对CS就业环境有了清醒认知。之前听程序员担忧裁员,还以为他杞人忧天。然而,现实就是如此寒冷彻骨啊! 当然,有些具备不可替代性的码农,永远吃香。总结发现有以下几点特质&…

vue Threejs实现任意画线(鼠标点击画线)

Threejs实现任意画线(鼠标点击画线) 鼠标左键单击添加点鼠标右键回退到上一个点,并继续画按住shift可以画平行于x轴或平行于z轴的线按Esc完成画线

蓝桥杯DP算法——区间DP(C++)

根据题意要求的是将石子合并的最小权值,我们可以根据DP思想使用二维数组f[i,j]来存放所有从第i堆石子到第j堆石子合并成一堆石子的合并方式。 然后由第二个图所示,我们可以将i到j区间分成两个区间,因为将i到j合并成一个区间的前一步一定是合…

Jmeter学习系列之六:阶梯加压线程组Stepping Thread Group详解

性能测试中,有时需要模拟一种实际生产中经常出现的情况,即:从某个值开始不断增加压力,直至达到某个值,然后持续运行一段时间。 在jmeter中,有这样一个插件,可以帮我们实现这个功能,这个插件就是:Stepping Thread Group 1、下载配置方法 1.1.下载配置 插件下载地址:…

http相关概念以及apache的功能(最详细讲解!!!!)

概念 互联网:是网络的网络,是所有类型网络的母集 因特网:世界上最大的互联网网络 万维网:www (不是网络,而是数据库)是网页与网页之间的跳转关系 URL:万维网使用统一资源定位符,…

HQYJ 2024-2-23 作业

自己实现单向循环链表的功能 整理思维导图 复习前面顺序表和链表的代码&#xff0c;重写链表逆置函数 1.实现单向循环链表的功能 loop_link_list.h文件 #ifndef __LOOP_LINK_LIST__ #define __LOOP_LINK_LIST__ #include<stdio.h> #include<stdlib.h> typedef…

Java零基础 - 三元运算符

哈喽&#xff0c;各位小伙伴们&#xff0c;你们好呀&#xff0c;我是喵手。 今天我要给大家分享一些自己日常学习到的一些知识点&#xff0c;并以文字的形式跟大家一起交流&#xff0c;互相学习&#xff0c;一个人虽可以走的更快&#xff0c;但一群人可以走的更远。 我是一名后…

SpringBoot对于SpringMVC的支持

创建项目 版本说明这里使用的 SpringBoot 2.0.0.Release SpringBoot对于SpringMVC的支持 在之前的开发中很多场景下使用的是基于xml配置文件或者是Java配置类的方式来进行SpringMVC的配置。一般来讲&#xff0c;初始的步骤如下所示 1、初始化SpringMVC的DispatcherServlet2、…

C++的string容器->基本概念、构造函数、赋值操作、字符串拼接、查找和替换、字符串比较、字符存取、插入和删除、子串

#include<iostream> using namespace std; #include <string> //string的构造函数 /* -string(); //创建一个空的字符串 例如: string str; -string(const char* s); //使用字符串s初始化 -string(const string& str); //使…

前端JS学习(二):BOM、DOM对象与事件

Web API基本认知 Web API 的作用&#xff1a;使用JS去操作html和浏览器 Web API 的分类&#xff1a;DOM(网页文档对象模型)、BOM(浏览器对象模型) BOM BOM的全称是 Browser Object Model&#xff0c;浏览器对象模型。也就是 JavaScript 将浏览器的各个组成部分封装成了对象&…

修改单据转换规则后保存报错提示

文章目录 修改单据转换规则后保存报错提示 修改单据转换规则后保存报错提示

区块链与Solidity详细介绍及基本语法使用

一、区块链简介 区块链是一种分布式数据库技术&#xff0c;它以块的形式存储数据&#xff0c;并通过加密算法确保数据的安全性。每个块包含一系列交易&#xff0c;并通过哈希值与前一个块相连接&#xff0c;形成一个链式结构。这种结构使得数据难以被篡改&#xff0c;因为任何对…

2024.2.22 C++QT 作业

思维导图 练习题 1>完善对话框&#xff0c;点击登录对话框&#xff0c;如果账号和密码匹配&#xff0c;则弹出信息对话框&#xff0c;给出提示”登录成功“&#xff0c;提供一个Ok按钮&#xff0c;用户点击Ok后&#xff0c;关闭登录界面&#xff0c;跳转到其他界面。如果账…

线程池的基础使用和执行策略

什么是线程池 线程池&#xff0c;字面意思就是一个创建线程的池子&#xff0c;它的特点就是&#xff0c;在使用线程之前&#xff0c;就一次性把多个线程创建好&#xff0c;放到"池”当中。后面需要执行任务的时候&#xff0c;直接从"线程池"当中通过线程执行。…

通俗易懂分析:Vite和Webpack的区别

1、对项目构建的理解 先从浏览器出发&#xff0c; 浏览器是由浏览器内核和JS引擎组成&#xff1b;浏览器内核编译解析html代码和css代码&#xff0c;js引擎编译解析JavaScript代码&#xff1b;所以从本质上&#xff0c;浏览器只能识别运行JavaScript、CSS、HTML代码。 而我们在…

MATLAB环境下基于短时傅里叶变换和Rényi熵的脑电信号和语音信号分析

傅里叶变换是不能很好的反映信号在时域的某一个局部范围的频谱特点的&#xff0c;这一点很可惜。因为在许多实际工程中&#xff0c;人们对信号在局部区域的特征是比较关心的&#xff0c;这些特征包含着十分有用的信息。这类信号因为在时域(或者是空间域)上具有突变的非稳定性和…

Java基于SSM+JSP的超市进销库存管理系统

博主介绍&#xff1a;✌程序员徐师兄、7年大厂程序员经历。全网粉丝12w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;…

深入了解Git

1.1 Git 的工作流程简介 克隆 Git 资源作为工作目录 在克隆的资源上添加或修改文件 如果其他人修改了&#xff0c;你可以更新资源 在提交前查看修改 提交修改 在修改完成后&#xff0c;如果发现错误&#xff0c;可以撤回提交并再次修改并提交 1.2 Git 工作区、暂存区和版…

自存 angular material design 表单输入框lable右对齐样式

单个输入框的文字lable放输入框左边实现 material design 的组件库示例没有文字描述放左边的样式 ,所以mat-lable并没有放在mat-form-field中 <div class"input-container col-6"><mat-label>商品售价<span class"text-error">*</spa…