Kafka安全模式之身份认证

一、简介

Kafka作为一个分布式的发布-订阅消息系统,在日常项目中被频繁使用,通常情况下无论是生产者还是消费者只要订阅Topic后,即可进行消息的发送和接收。而kafka在0.9.0.0版本后添加了身份认证和权限控制两种安全服务,本文主要介绍在实际项目使用过程中遇到第三方kafka需身份认证时如何解决,以及对可能会碰到的问题进行总结。

二、原理介绍

Kafka身份认证主要分为以下几种:

(1)客户端与broker之间的连接认证

(2)broker与broker之间的连接认证

(3)broker与zookeeper之间的连接认证

日常项目中,无论是生产者还是消费者,我们都是作为客户端与kafka进行交互,因此使用的最多的是客户端与broker之间的连接认证。图1是客户端与服务端broker之间的认证过程图,客户端提交认证数据,服务端会根据认证数据对当前客户端进行身份校验,校验成功后的客户端即可成功登录kafka,进行后续操作。

图1 客户端与broker之间认证过程图

目前Kafka提供了SASL、SSL、Delegation Tokem三种安全认证机制,而SASL认证又分为了以下几种方式:

(1)基于Kerberos的GSSAPI

SASL-GSSAPI提供了一种非常安全的身份验证方法,但使用前提是企业中有Kerberos基础,一般使用随机密码的keytab认证方式,密码是加密的,在0.9版本中引入,目前是企业中使用最多的认证方式。

(2)SASL-PLAIN

SASL-PLAIN方式是一个经典的用户名/密码的认证方式,其中用户名和密码是以明文形式保存在服务端的JAAS配置文件中的,当客户端使用PLAIN模式进行认证时,密码是明文传输的,因此安全性较低,但好处是足够简单,方便我们对其进行二次开发,在0.10版本引入。

(3)SASL-SCRAM

SASL-SCRAM是针对SASL-PLAIN方式的不足而提供的另一种认证方式,它将用户名/密码存储在zookeeper中,并且可以通过脚本动态增减用户,当客户端使用SCRAM模式进行认证时,密码会经过SHA-256或SHA-512哈希加密后传输到服务器,因此安全性较高,在0.10.2版本中引入。

对Kafka集群来说,要想实现完整的安全模式,首先为集群中的每台机器生成密钥和证书是第一步,其次利用SASL对客户端进行身份验证是第二步,最后对不同客户端进行读写操作的授权是第三步,这些步骤即可以单独运作也可以同时运作,从而提高kafka集群的安全性。

三、具体实现

本文主要介绍作为kafka生产者,如何基于Kerberos进行身份认证给第三方kafka发送数据。

Kerberos主要由三个部分组成:密钥分发中心Key Distribution Center(即KDC)、客户端Client、服务端Service,大致关系图如下图2所示,其中KDC是实现身份认证的核心组件,其包含三个部分:

  1. Kerberos Database:储存用户密码以及其他信息
  2. Authentication Service(AS):进行用户身份信息验证,为客户端提供Ticket Granting Tickets(TGT)
  3. Ticket Granting Service(TGS):验证TGT,为客户端提供Service Tickets

我们作为生产者向第三方kafka发送数据,因此需要第三方提供以下安全认证文件:

  • 用户名principle:标识客户端的用户身份,也即用于登录的用户名
  • 指定用户名对应的秘钥文件xx.keytab:存储了用户的加密密码
  • 指定安全认证的服务配置文件krb5.conf:客户端根据该文件中的信息去访问KDC

获取以上安全认证文件后,即可编写java代码连接第三方kafka,步骤如下:

1、将安全认证文件xx.keytabkrb5.conf放置于某一路径下,确保后续java代码可进行读取

2、添加kafka配置文件,开启安全模式认证,其中kerberos.path是第一步中认证文件所在的目录

3、修改Kafka生产者配置,开启安全连接

4、调用认证工具类进行登录认证

LoginUtil认证工具类的核心是根据第一步中提供的安全认证文件自动生成jaas配置文件,该文件是kafka安全模式下认证的核心。代码如下:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.File;
import java.io.FileWriter;
import java.io.IOException;/*** @ProjectName: stdp-security-demo* @Package: * @ClassName: LoginUtil* @Author: stdp* @Description: ${description}*/
public class LoginUtil {public enum Module {KAFKA("KafkaClient"), ZOOKEEPER("Client");private String name;Module(String name) {this.name = name;}public String getName() {return name;}}private static final Logger LOGGER = LoggerFactory.getLogger(LoginUtil.class);/*** line operator string*/private static final String LINE_SEPARATOR = System.getProperty("line.separator");/*** jaas file postfix*/private static final String JAAS_POSTFIX = ".jaas.conf";private static final String JAVA_SECURITY_KRB5_CONF_KEY = "java.security.krb5.conf";public static final String JAVA_SECURITY_LOGIN_CONF_KEY = "java.security.auth.login.config";private static final String ZOOKEEPER_SERVER_PRINCIPAL_KEY = "zookeeper.server.principal";private static final boolean IS_IBM_JDK = System.getProperty("java.vendor").contains("IBM");/*** oracle jdk login module*/private static final String SUN_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule required";public synchronized static void login(String userPrincipal, String userKeytabPath, String krb5ConfPath)throws IOException{// 1.check input parametersif ((userPrincipal == null) || (userPrincipal.length() <= 0)){LOGGER.error("input userPrincipal is invalid.");throw new IOException("input userPrincipal is invalid.");}if ((userKeytabPath == null) || (userKeytabPath.length() <= 0)){LOGGER.error("input userKeytabPath is invalid.");throw new IOException("input userKeytabPath is invalid.");}if ((krb5ConfPath == null) || (krb5ConfPath.length() <= 0)){LOGGER.error("input krb5ConfPath is invalid.");throw new IOException("input krb5ConfPath is invalid.");}// 2.check file exsitsFile userKeytabFile = new File(userKeytabPath);if (!userKeytabFile.exists()){LOGGER.error("userKeytabFile(" + userKeytabFile.getAbsolutePath() + ") does not exsit.");throw new IOException("userKeytabFile(" + userKeytabFile.getAbsolutePath() + ") does not exsit.");}if (!userKeytabFile.isFile()){LOGGER.error("userKeytabFile(" + userKeytabFile.getAbsolutePath() + ") is not a file.");throw new IOException("userKeytabFile(" + userKeytabFile.getAbsolutePath() + ") is not a file.");}File krb5ConfFile = new File(krb5ConfPath);if (!krb5ConfFile.exists()){LOGGER.error("krb5ConfFile(" + krb5ConfFile.getAbsolutePath() + ") does not exsit.");throw new IOException("krb5ConfFile(" + krb5ConfFile.getAbsolutePath() + ") does not exsit.");}if (!krb5ConfFile.isFile()){LOGGER.error("krb5ConfFile(" + krb5ConfFile.getAbsolutePath() + ") is not a file.");throw new IOException("krb5ConfFile(" + krb5ConfFile.getAbsolutePath() + ") is not a file.");}// 3.set and check krb5configsetKrb5Config(krb5ConfFile.getAbsolutePath());//        LOGGER.info("check zookeeper server Principal =============================================");setZookeeperServerPrincipal(userPrincipal);
//        LOGGER.info("check jaas.conf +++++++++++++++++++++++++++++++++++++++++++++++++");setJaasFile(userPrincipal,userKeytabPath);LOGGER.info("Login success!!!!!!!!!!!!!!");}public static void setKrb5Config(String krb5ConfigFile) throws IOException {System.setProperty(JAVA_SECURITY_KRB5_CONF_KEY,krb5ConfigFile);String ret = System.getProperty(JAVA_SECURITY_KRB5_CONF_KEY);if (ret == null) {LOGGER.error(JAVA_SECURITY_KRB5_CONF_KEY + " is null.");throw new IOException(JAVA_SECURITY_KRB5_CONF_KEY + " is null.");}if (!ret.equals(krb5ConfigFile)){LOGGER.error(JAVA_SECURITY_KRB5_CONF_KEY + " is " + ret + " is not " + krb5ConfigFile + ".");throw new IOException(JAVA_SECURITY_KRB5_CONF_KEY + " is " + ret + " is not " + krb5ConfigFile + ".");}}public static void setJaasFile(String userPrincipal,String userKeytabPath) throws IOException {String jaasPath = new File(System.getProperty("java.io.tmpdir")) + File.separator + System.getProperty("user.name") + JAAS_POSTFIX;LOGGER.info("jaasPath = {}",jaasPath);//windows路径下分隔符替换jaasPath = jaasPath.replace("\\","\\\\");userKeytabPath = userKeytabPath.replace("\\","\\\\");//删除jaas文件deleteJaasFile(jaasPath);writeJaasFile(jaasPath,userPrincipal,userKeytabPath);System.setProperty(JAVA_SECURITY_LOGIN_CONF_KEY,jaasPath);}private static void deleteJaasFile(String jaasPath) throws IOException {File jaasFile = new File(jaasPath);if (jaasFile.exists()){if (!jaasFile.delete()){throw new IOException("failed to delete exists jaas file.");}}}private static void writeJaasFile(String jaasPath,String userPrincipal,String userKeytabPath) throws IOException {FileWriter writer = new FileWriter(new File(jaasPath));try{writer.write(getJaasConfContext(userPrincipal,userKeytabPath));writer.flush();}catch (IOException e){throw new IOException("Failed to create jaas.conf File.");}finally {writer.close();}}private static String getJaasConfContext(String userPrincipal,String userKeytabPath) throws IOException{Module[] allModule = Module.values();StringBuffer builder = new StringBuffer();for (Module module: allModule){String serviceName = null;if ("Client".equals(module.getName())){serviceName = "zookeeper";}else if ("KafkaClient".equals(module.getName())){serviceName = "kafka";}builder.append(getModuleContext(userPrincipal,userKeytabPath,module,serviceName));}return builder.toString();}private static String getModuleContext(String userPrincipal,String userKeytabPath,Module module,String serviceName) throws IOException {StringBuffer builder = new StringBuffer();if (IS_IBM_JDK){builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);builder.append("credsType=both").append(LINE_SEPARATOR);builder.append("principal=\"" + userPrincipal.trim() + "\"").append(LINE_SEPARATOR);builder.append("useKeytab=\"" + userKeytabPath + "\"").append(LINE_SEPARATOR);builder.append("serviceName=\""+serviceName + "\"").append(LINE_SEPARATOR);builder.append("debug=true;").append(LINE_SEPARATOR);builder.append("};").append(LINE_SEPARATOR);}else {builder.append(module.getName()).append(" {").append(LINE_SEPARATOR);builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR);builder.append("useKeyTab=true").append(LINE_SEPARATOR);builder.append("keyTab=\"" + userKeytabPath + "\"").append(LINE_SEPARATOR);builder.append("principal=\"" + userPrincipal.trim() + "\"").append(LINE_SEPARATOR);builder.append("serviceName=\""+serviceName + "\"").append(LINE_SEPARATOR);builder.append("useTicketCache=false").append(LINE_SEPARATOR);builder.append("storeKey=true").append(LINE_SEPARATOR);builder.append("debug=true;").append(LINE_SEPARATOR);builder.append("};").append(LINE_SEPARATOR);}return builder.toString();}public static void setZookeeperServerPrincipal(String zkServerPrincipal) throws IOException {System.setProperty(ZOOKEEPER_SERVER_PRINCIPAL_KEY,zkServerPrincipal);String ret = System.getProperty(ZOOKEEPER_SERVER_PRINCIPAL_KEY);if (ret == null) {LOGGER.error(ZOOKEEPER_SERVER_PRINCIPAL_KEY + " is null.");throw new IOException(ZOOKEEPER_SERVER_PRINCIPAL_KEY + " is null.");}if (!ret.equals(zkServerPrincipal)){LOGGER.error(ZOOKEEPER_SERVER_PRINCIPAL_KEY + " is " + ret + " is not " + zkServerPrincipal + ".");throw new IOException(ZOOKEEPER_SERVER_PRINCIPAL_KEY + " is " + ret + " is not " + zkServerPrincipal + ".");}}
}

经过以上四步的配置,启动项目后即可自动连接kafka进行身份校验,若登录成功,会输出如下提示信息:Login success,并且会将生成的jaas文件路径打印出来。

四、常见问题

1、认证文件找不到

这是因为步骤1中kerberos.path配置有问题,检查path路径下是否存在认证文件keytab和krb5.conf。

2、 principal和keytab不匹配

不同的用户名对应不同的密码,在身份校验时,需保证用户名principle和密码keytab的一致性,否则无法验证通过。而principal和keytab不匹配可能存在以下两种场景:

  •  配置文件中出现问题:检查kerberos.principle和kerberos.keytab中的用户名(即hkjj)是否一致。

  •  检查生成的jaas文件中用户名和配置的用户名是否相同

如果步骤1检查没用问题,则可根据日志中输出的jaas文件路径查看自动生成的jaas文件中的principal和配置文件中的kerberos.principle是否一致。比如我的这个项目中,就是由于现场技术配置kerberos.principle时后面多打了一个空格,导致自动生成的jaas文件中的principle后多一个空格,因此和keytab认证失败。

为了彻底解决这个误打空格的问题,可以直接修改认证工具类LoginUtil,在生成jaas文件的principle时去掉可能存在的空格。

3、用户密码keytab更新,导致出现checksum failed

这是由于principal对应的密码修改了,但是程序中使用的还是旧的密码,就会出现这个问题。解决办法是找第三方提供principal对应的最新的密码文件keytab

4jaas文件找不到

该问题是由于找不到jaas.conf 这个文件导致的,而基于kerberos认证时一般不会出现,这是因为kerberos认证时jaas文件是由LoginUtil工具类根据安全认证文件自动生成并且存储在指定路径下的。

该问题通常出现在SASL-PLAIN方式的认证中,因为该方式需要添加一个配置参数java.security.auth.login.config来标识jaas文件的路径,如果文件路径出错则会报以上错误。

五、总结

在kafka身份认证的过程中,需要的principal,keytab,ServiceName等信息均配置在jaas文件中,因此保证认证的服务可以读取到正确的文件及正确的配置是kafka安全模式下认证的核心。

基于kerberos认证时,可根据安全认证文件自动生成jaas配置文件,从而保证了密码加密传输,相比于SASL-PLAIN模式更具安全性,并且认证实现过程也较为简单。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/2814290.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

微信小程序的医院体检预约管理系统springboot+uniapp+python

本系统设计的目的是建立一个简化信息管理工作、便于操作的体检导引平台。共有以下四个模块&#xff1a; uni-app框架&#xff1a;使用Vue.js开发跨平台应用的前端框架&#xff0c;编写一套代码&#xff0c;可编译到Android、小程序等平台。 语言&#xff1a;pythonjavanode.js…

第十四天-网络爬虫基础

1.什么是爬虫 1.爬虫&#xff08;又被称为网页蜘蛛&#xff0c;网络机器人&#xff09;&#xff0c;是按照一定规则&#xff0c;自动的抓取万维网中的程序或者脚本&#xff0c;是搜索引擎的重要组成&#xff1b;比如&#xff1a;百度、 2.爬虫应用&#xff1a;1.搜索引擎&…

Rust使用calamine读取excel文件,Rust使用rust_xlsxwriter写入excel文件

Rust使用calamine读取已存在的test.xlsx文件全部数据&#xff0c;还读取指定单元格数据&#xff1b;Rust使用rust_xlsxwriter创建新的output.xlsx文件&#xff0c;并写入数据到指定单元格&#xff0c;然后再保存工作簿。 Cargo.toml main.rs /*rust读取excel文件*/ use cala…

Linux下性能分析的可视化图表工具

1 sar 和sadf 1.1 简介 sar命令可以记录系统下的常见活动信息&#xff0c;例如CPU使用率、网络统计数据、Block I/O数据、内存使用情况 等。 sar命令的“-o [file_name]”参数可以将系统活动数据记录到file_name文件&#xff0c;然后通过sadf来解析&#xff0c;sadf命令的“-g…

【ArcGIS】基本概念-空间参考与变换

ArcGIS基本概念-空间参考与变换 1 空间参考与地图投影1.1 空间参考1.2 大地坐标系&#xff08;地理坐标系&#xff09;1.3 投影坐标系总结 2 投影变换预处理2.1 定义投影2.2 转换自定义地理&#xff08;坐标&#xff09;变换2.3 转换坐标记法 3 投影变换3.1 矢量数据的投影变换…

wayland(xdg_wm_base) + egl + opengles 使用 Assimp 加载3D model 最简实例(十三)

文章目录 前言一、3D model 文件介绍1. 3d model 介绍1.1 如何获取3d model 文件1.2 3d model 的文件格式1.3 obj模型数据格式2. 3d 立方体 model 实例——cube.obj二、Assimp 介绍1. Assimp 简介2.ubuntu 上安装libassimp3. 使用Assimp 解析 cube.obj 文件3.1 assimp_load_cub…

【ArcGIS】统计格网中不同土地利用类型占比

基于ArcGIS统计格网中不同土地利用类型占比 数据准备ArcGIS操作步骤1、创建渔网&#xff08;Create Fishnet&#xff09;2、建立唯一标识3、选择格网4、提取不同类别土地利用类型5、各类用地面积计算 参考另&#xff1a;可能出现的问题总结Q1&#xff1a;ArcGIS获取唯一值&…

备战蓝桥杯————如何判断回文链表

如何判断回文链表 题目描述 给你一个单链表的头节点 head &#xff0c;请你判断该链表是否为回文链表。如果是&#xff0c;返回 true &#xff1b;否则&#xff0c;返回 false 。 示例 1&#xff1a; 输入&#xff1a;head [1,2,2,1] 输出&#xff1a;true示例 2&#xff1a;…

AI学习(5):PyTorch-核心模块(Autograd):自动求导

1.介绍 在深度学习中&#xff0c;自动求导是一项核心技术&#xff0c;它使得我们能够方便地计算梯度并优化模型参数。PyTorch 提供了一个强大的自动求导模块(Autograd)&#xff0c;它可以自动计算张量的导数得出梯度信息&#xff0c;同时也支持高阶导数计算。 1.1 概念词 在学…

无人机飞行控制系统技术,四旋翼无人机控制系统建模技术详解

物理建模是四旋翼无人机控制系统建模的基础&#xff0c;主要涉及到无人机的物理特性和运动学特性。物理建模的目的是将无人机的运动与输入信号&#xff08;如控制电压&#xff09;之间的关系进行数学描述。 四旋翼无人直升机是具有四个输入力和六个坐标输出的欠驱动动力学旋翼…

Linux系统——Nginx服务——响应四级模型

stream和http模块是同级 [rootnode2 html]#yum install epel-release.noarch -y [rootnode2 html]#yum install redis -y [rootnode2 html]#systemctl start redis[rootnode3 html]#yum install epel-release.noarch -y [rootnode3 html]#yum install redis -y [rootnode3 html…

基于java+IDEA+Mysql开发的SSM+旅游后台管理系统

基于javaIDEAMysql开发的SSM旅游后台管理系统 项目介绍&#x1f481;&#x1f3fb; 旅游后台管理系统是一款应用于旅游后台管理的业务系统&#xff0c;实现旅游工作内容可视化、旅游管理专业化、业务评估数字化&#xff0c;从而提高旅游后台数据管理的高效性。 随着互联网时代的…

自动驾驶消息传输机制-LCM

需要用到LCM消息通讯&#xff0c;遂研究下。 这里写目录标题 1 LCM简介2. LCM源码分析3 LCM C教程与实例3.1 安装配置及介绍3.2 创建类型定义3.3 初始化LCM3.4 发布publish一个消息3.5 订阅和接收一个消息3.6 LCM进程间通讯3.7 注意事项&#xff1f;3.7.1 当数据结构定义的是数…

【ArcGIS】重采样栅格像元匹配问题:不同空间分辨率栅格数据统一

重采样栅格像元匹配问题&#xff1a;不同空间分辨率栅格数据统一 原始数据数据1&#xff1a;GDP分布数据2.1&#xff1a;人口密度数据2.2&#xff1a;人口总数数据3&#xff1a;土地利用类型 数据处理操作1&#xff1a;将人口密度数据投影至GDP数据&#xff08;栅格数据的投影变…

阿里云定价_ECS产品价格_云服务器收费标准 - 阿里云官方活动

2024年最新阿里云服务器租用费用优惠价格表&#xff0c;轻量2核2G3M带宽轻量服务器一年61元&#xff0c;折合5元1个月&#xff0c;新老用户同享99元一年服务器&#xff0c;2核4G5M服务器ECS优惠价199元一年&#xff0c;2核4G4M轻量服务器165元一年&#xff0c;2核4G服务器30元3…

Docker 常用操作命令备忘

Docker 一旦设置好了环境&#xff0c;日常就只要使用简单命令就可以运行和停止。 于是&#xff0c;我每次用的时候&#xff0c;都想不起来一些关键性的命令到底怎么用&#xff0c;特此记录。 一、镜像管理 从公有仓库拉取镜像 &#xff08;对于使用苹果电脑 M1/M2/M3 芯片的 …

Java玩转《啊哈算法》之模拟链表

人应该支配习惯&#xff0c;而绝不是让习惯支配人。一个人要是不能改掉坏习惯&#xff0c;那么他就一文不值。 目录 缘代码地址模拟链表创建遍历打印插入插入优化 完整代码 缘 各位小伙伴们好呀&#xff01;本人最近看了下《啊哈算法》&#xff0c;写的确实不错。 但稍显遗憾…

在Golang中简化日志记录:提升性能和调试效率

最大化效率和有效故障排除&#xff1a;在Golang中简化日志记录 日志记录是软件开发的一个基本方面&#xff0c;有助于调试、监控和理解应用程序的流程。在Golang中&#xff0c;有效的日志记录实践可以显著提高性能并简化调试过程。本文探讨了优化Golang日志记录的技术&#xf…

Android WebView实现网页滚动截图

WebView 网页滚动截屏&#xff0c;可对整个网页进行截屏而不是仅当前屏幕哦&#xff01; 注意若Web页面存在position:fixed; 的话得在调用前设置为 position:absolute; 哦&#xff0c;否则会出现很多次的&#xff0c;请看下面的具体解说吧&#xff01;&#xff01; private s…

一周学会Django5 Python Web开发-Django5列表视图ListView

锋哥原创的Python Web开发 Django5视频教程&#xff1a; 2024版 Django5 Python web开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili2024版 Django5 Python web开发 视频教程(无废话版) 玩命更新中~共计27条视频&#xff0c;包括&#xff1a;2024版 Django5 Python we…