Kafka SASL_SSL双重认证
文章目录
- 1. 背景
- 2. 环境
- 3. 操作步骤
- 3.1 生成SSL证书
- 3.2 配置zookeeper认证
- 3.3 配置kafka安全认证
- 3.4 使用kafka客户端进行验证
- 3.5 使用Java端代码进行认证
1. 背景
kafka提供了多种安全认证机制,主要分为SASL和SSL两大类。
- SASL: 是一种身份验证机制,用于在客户端和服务器之间进行身份验证的过程,其中SASL/PLAIN是基于账号密码的认证方式。
- SSL: 是一种加密协议,用于在网络通信中提供数据的保密性和完整性。它使用公钥和私钥来建立安全的连接,并对传输的数据进行加密和解密,以防止未经授权的访问和篡改。
在 Kafka 中启用 SASL_SSL 安全协议时,SASL 用于客户端和服务器之间的身份验证,SSL 则用于加密和保护数据的传输。不仅提供身份验证,还提供加密和数据保护的功能。
因工作需要,需要在测试环境搭建一套基于SASL_SSL协议的kafka环境。坑比较多,经过两天的研究终于搞定了,特在此记录下。
2. 环境
- 操作系统:linux
- kafka版本:kafka_2.13-2.7.1
- zookeeper版本:apache-zookeeper-3.7.0
- 应用程序版本:spring-boot-2.6.7、JDK1.8
3. 操作步骤
- 生成SSL证书
- 配置zookeeper
- 配置kafka
- 前三步配置完成后kafka就开启了SASL_SSL双重认证,可以使用kafka自带的客户端进行测试(3.4),
- 在业务代码中使用请查看(3.5)
3.1 生成SSL证书
按照步骤一步一步操作,生成服务器/客户端的SSL证书。也就是公钥与私钥
参考:【SSL协议】生成SSL证书 - lihewei - 博客园 (cnblogs.com)
3.2 配置zookeeper认证
第一步: 在apache-zookeeper-3.7.0/conf 目录下创建 kafka_zk_jaas.conf 配置文件(名称任意),定义了两个用户,可提供给生产者和消费者使用,格式为:user_用户名=“用户密码”,内容如下:
Server {org.apache.zookeeper.server.auth.DigestLoginModule requireduser_admin="1qaz@WSX"user_kafka="1qaz@WSX";
};
第二步: zookeeper配置文件zoo.cfg中新增SASL认证配置,如下:
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
第三步: 在apache-zookeeper-3.7.0/bin/zkServer.sh脚本中新增jvm参数,让其启动时加载jaas配置文件
export SERVER_JVMFLAGS="-Xmx${ZK_SERVER_HEAP}m $SERVER_JVMFLAGS -Djava.security.auth.login.config=/home/crbt/local/apache-zookeeper-3.7.0/conf/kafka_zk_jaas.conf"

3.3 配置kafka安全认证
第一步: /home/crbt/local/kafka_2.13-2.7.1/config目录下创建kafka-server-jaas.conf和kafka-client-jaas.conf配置文件,内容如下:
kafka-server-jaas.conf
KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="1qaz@WSX"user_admin="1qaz@WSX"user_kafka="1qaz@WSX";
};Client {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="kafka"password="1qaz@WSX";
};
kafka-client-jaas.conf
KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="kafka"password="1qaz@WSX";
};
第二步: 在kafka启动脚本(kafka_2.13-2.7.1/bin/kafka-server-start.sh)配置环境变量,指定jaas.conf文件,增加如下代码:
增加环境变量: -Djava.security.auth.login.config=/home/crbt/local/kafka_2.13-2.7.1/config/kafka-server-jaas.conf
...if [ "x$KAFKA_HEAP_OPTS" = "x" ]; thenexport KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/home/crbt/local/kafka_2.13-2.7.1/config/kafka-server-jaas.conf"
fi...
**第三步:**修改 kafka 的 server.properties配置文件
#listeners=SSL://10.1.61.121:9092
host.name=node1
#listeners=PLAINTEXT://node1:9092,SSL://node1:9093
listeners=SASL_SSL://node1:9093
#advertised.listeners=SSL://node1:9092
advertised.listeners=SASL_SSL://node1:9093
ssl.keystore.location=/home/crbt/lihw/ca/server/server.keystore.jks
ssl.keystore.password=Q06688
ssl.key.password=Q06688
ssl.truststore.location=/home/crbt/lihw/ca/trust/server.truststore.jks
ssl.truststore.password=Q06688
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS
ssl.truststore.type=JKS
# kafka2.0.x开始,将ssl.endpoint.identification.algorithm设置为了HTTPS,即:需要验证主机名
# 如果不需要验证主机名,那么可以这么设置 ssl.endpoint.identification.algorithm=即可
ssl.endpoint.identification.algorithm=
# 设置内部访问也用SSL,默认值为security.inter.broker.protocol=PLAINTEXT
security.inter.broker.protocol=SASL_SSL
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true
注意:这里有个坑,生成SSL密钥私钥时指定了主机的hostname,这里也要配置kafka所在服务器的hostname
3.4 使用kafka客户端进行验证
第一步: 修改kafka/config/下的 consumer.properties、producer.properties,配置SASL_SSL验证的基本信息。
consumer.properties:
bootstrap.servers=node1:9093
security.protocol=SASL_SSL
ssl.truststore.location=/home/crbt/lihw/ca/trust/server.truststore.jks
ssl.truststore.password=Q06688sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="1qaz@WSX";
producer.properties:
bootstrap.servers=node1:9093
security.protocol=SASL_SSL
ssl.truststore.location=/home/crbt/lihw/ca/trust/server.truststore.jks
ssl.truststore.password=Q06688sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="1qaz@WSX";
第二步: 使用命令行操作时,让其找到上述设置的SASL_SSL配置文件( --producer.config …/config/producer.properties)
#生产
crbt@node1:/home/crbt/local/kafka_2.13-2.7.1/bin>./kafka-console-producer.sh --bootstrap-server node1:9093 --topic first --producer.config ../config/producer.properties
>aaa
>bbb
>ccc
>#消费
crbt@node1:/home/crbt/local/kafka_2.13-2.7.1/bin>./kafka-console-consumer.sh --bootstrap-server node1:9093 --topic first -consumer.config /home/crbt/local/kafka_2.13-2.7.1/config/consumer.properties
aaa
bbb
ccc
3.5 使用Java端代码进行认证
第一步: yaml 配置文件
spring:kafka:bootstrap-servers: localhost:9093properties:sasl:mechanism: PLAINjaas:#此处填写 SASL登录时分配的用户名密码(注意password结尾;)config: org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="1qaz@WSX";security:protocol: SASL_SSLssl:trust-store-location: /home/crbt/lihw/ca/trust/server.truststore.jkstrust-store-password: Q06688key-store-type: JKSproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerbatch-size: 106384acks: -1retries: 3properties:linger-ms: 1retry.backoff.ms: 1000buffer-memory: 33554432
第二步: 使用 kafkaTemplate 的方式,配置一个 config
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;@Slf4j
@Configuration
public class KafkaProducerConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.producer.acks}")private String acks;@Value("${spring.kafka.producer.retries}")private String retries;@Value("${spring.kafka.producer.batch-size}")private String batchSize;@Value("${spring.kafka.producer.properties.linger-ms}")private int lingerMs;@Value("${spring.kafka.producer.properties.buffer-memory}")private int bufferMemory;@Value("${spring.kafka.producer.key-serializer}")private String keySerializer;@Value("${spring.kafka.producer.value-serializer}")private String valueSerializer;@Value("${spring.kafka.properties.security.protocol}")private String protocol;@Value("${spring.kafka.properties.sasl.mechanism}")private String mechanism;@Value("${spring.kafka.ssl.trust-store-location}")private String trustStoreLocation;@Value("${spring.kafka.ssl.trust-store-password}")private String trustStorePassword;@Value("${spring.kafka.ssl.key-store-type}")private String keyStoreType;@Value("${spring.kafka.properties.sasl.jaas.config}")private String jaasConfig;@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}/*** the producer factory config*/@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> props = new HashMap<String, Object>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.ACKS_CONFIG, acks);props.put(ProducerConfig.RETRIES_CONFIG, retries);props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);props.put("security.protocol", protocol);props.put(SaslConfigs.SASL_MECHANISM, mechanism);props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreLocation);props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassword);props.put(SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, keyStoreType);props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);return new DefaultKafkaProducerFactory<String, String>(props);}
}
相关文章:
Kafka SASL_SSL双重认证
文章目录 1. 背景2. 环境3. 操作步骤3.1 生成SSL证书3.2 配置zookeeper认证3.3 配置kafka安全认证3.4 使用kafka客户端进行验证3.5 使用Java端代码进行认证 1. 背景 kafka提供了多种安全认证机制,主要分为SASL和SSL两大类。 SASL: 是一种身份验证机制&…...
css新手教程
css新手教程 课程:14、盒子模型及边框使用_哔哩哔哩_bilibili 一.什么是CSS 1.什么是CSS Cascading Style Sheet 层叠样式表。 CSS:表现(美化网页) 字体,颜色,边距,高度,宽度&am…...
spring boot(2.4.x之前版本)和spring cloud项目中配置文件的作用
spring 版本以及相关的组件一直在变化,其中一些类或者功能在低版本中有,高版本中去掉了,有的新功能只在高版本有。 为了防止理解问题,pom.xml 版本依赖如下 <parent><groupId>org.springframework.boot</groupId…...
web前后端小坑记录
游戏服务器过年这段时间忙完了,好久没看web了,重温一下。发现竟然没有文章记录这些修BUG的过程,记录一下。 目录 如何处理F5刷新? 如何处理F5刷新? 后端应该发现路由不存在,直接返回打包好的index.html就…...
股票K线简介
股票K线(K-Line)是用于表示股票价格走势的图形,主要由四个关键价格点组成:开盘价、收盘价、最高价和最低价。K线图广泛应用于股票市场技术分析中,它提供了丰富的信息,帮助分析师和投资者理解市场的行情走势…...
路由器、路由器的构成、交换结构
目录 1 路由器 1.1 路由器的结构 “转发”和“路由选择”的区别 1.1.1 输入端口对线路上收到的分组的处理 1.1.2 输出端口将交换结构传送来的分组发送到线路 2.2 交换结构 2.2.1 通过存储器 2.2.2 通过总线 2.2.3 通过纵横交换结构 (crossbar switch fabric) 1 路由器…...
【Mysql】整理
Mysql整理与总结 整理Mysql的基本内容供回顾。 参考: [1]. 掘金.MySQL三大日志(binlog,redolog,undolog)详解 [2]. Javaguide.MySQL三大日志(binlog、redo log和undo log)详解...
项目02《游戏-08-开发》Unity3D
基于 项目02《游戏-07-开发》Unity3D , 本次任务做物品相互与详情的功能, 首先要做 点击相应, 接下来用接口实现点击相应事件,具体到代码中,我们找到需要响应鼠标事件的对象, 双击PackageCell…...
【数据库原理及应用】简答题归纳总结
第一章 数据库概论 1.人工管理阶段数据管理的特点: (1)数据不保存在机器中 (2)无专用的软件对数据进行管理 (3)只有程序的概念,没有文件的概念 (4)数据面向程…...
通过无线打通两个路由器
通过无线打通两个路由器 上网向导无线连接 配置比较简单,有些路由器支持有些不支持,支持的大致就是下面的方法,不过不同型号面板不一样,这里主要学习方法,所以不做路由器型号介绍。 重要的事情说三遍:学习要…...
idea 配置文件,中文出现乱码如何解决
在进行 spring 项目开发时,项目中有 application.properties/application.yml 等配置文件,在配置文件中使用中文注解时可能会出现乱码的情况,如下: 这是因为 idea 配置文件的编码和其他文件的不同,我们需要修改配置文件…...
网络协议梳理
1 引言 在计算机网络中要做到有条不紊地交换数据,就必须遵守一些事先约定好的规则。这些规则明确规定了所交换的数据的格式以及有关的同步问题。这里所说的同步不是狭义的(即同频或同频同相)而是广义的,即在一定的条件下应当发生什…...
14. 【Linux教程】文件压缩与解压
文件压缩与解压 前面小节介绍了如何对文件和目录删除、移动操作,本小节介绍如何使用命令对文件和目录进行压缩与解压操作,常见的压缩包格式有 .bz2、.Z、.gz、.zip、.xz,压缩之后的文件或目录占用更少的空间。 1. tar 命令介绍 下面列举 ta…...
ruoyi-nbcio中xxl-job的安装与使用
更多ruoyi-nbcio功能请看演示系统 gitee源代码地址 前后端代码: https://gitee.com/nbacheng/ruoyi-nbcio 演示地址: http://122.227.135.243:9666 更多nbcio-boot功能请看演示系统 gitee源代码地址 后端代码: https://gitee.com/nbach…...
从零学算法162
162.峰值元素是指其值严格大于左右相邻值的元素。 给你一个整数数组 nums,找到峰值元素并返回其索引。数组可能包含多个峰值,在这种情况下,返回 任何一个峰值 所在位置即可。 你可以假设 nums[-1] nums[n] -∞ 。 你必须实现时间复杂度为 O…...
5.0 ZooKeeper 数据模型 znode 结构详解
数据模型 在 zookeeper 中,可以说 zookeeper 中的所有存储的数据是由 znode 组成的,节点也称为 znode,并以 key/value 形式存储数据。 整体结构类似于 linux 文件系统的模式以树形结构存储。其中根路径以 / 开头。 进入 zookeeper 安装的 …...
《数电》理论笔记-第1章-逻辑代数基础
参考:视频 和 《数字电路与逻辑设计》 电子书 一,第1章 逻辑代数基础 1 数字量和模拟量 略 2 数制(十进制,二进制,八进制和十六进制) 拨电话(BoDH)---(2八10十六&…...
计算指定路径下的可用空间大小
方法一、使用psutil库 import psutildef check_disk_space(path):usage psutil.disk_usage(path)## 1GB 1 * 1024 * 1024 * 1024字节if usage.free > 1 * 1024 * 1024 * 1024:return 1else:return 0disk_path "/home" result check_disk_space(disk_path) pr…...
2023年全球软件架构师峰会(ArchSummit上海站):核心内容与学习收获(附大会核心PPT下载)
微服务架构是当今软件架构的主流趋势之一。随着云计算和分布式系统的普及,越来越多的企业开始采用微服务架构来构建他们的应用。微服务架构可以将一个大型的应用拆分成多个小型的服务,每个服务都独立部署、独立运行,并通过轻量级的通信协议进…...
踩坑实录(Second Day)
作为公司的小菜鸟,每天都踩坑应该是一件很正常的事情吧,哈哈哈。今天遇到了比较棘手的问题,以前从来没有遇到过。然后就是在某平台上接的一个 bug 修改的单子,也拿出来和大家分享一下~ 此为第二篇(2024 年 02 月 05 日…...
线程与协程
1. 线程与协程 1.1. “函数调用级别”的切换、上下文切换 1. 函数调用级别的切换 “函数调用级别的切换”是指:像函数调用/返回一样轻量地完成任务切换。 举例说明: 当你在程序中写一个函数调用: funcA() 然后 funcA 执行完后返回&…...
[ICLR 2022]How Much Can CLIP Benefit Vision-and-Language Tasks?
论文网址:pdf 英文是纯手打的!论文原文的summarizing and paraphrasing。可能会出现难以避免的拼写错误和语法错误,若有发现欢迎评论指正!文章偏向于笔记,谨慎食用 目录 1. 心得 2. 论文逐段精读 2.1. Abstract 2…...
人工智能(大型语言模型 LLMs)对不同学科的影响以及由此产生的新学习方式
今天是关于AI如何在教学中增强学生的学习体验,我把重要信息标红了。人文学科的价值被低估了 ⬇️ 转型与必要性 人工智能正在深刻地改变教育,这并非炒作,而是已经发生的巨大变革。教育机构和教育者不能忽视它,试图简单地禁止学生使…...
【笔记】WSL 中 Rust 安装与测试完整记录
#工作记录 WSL 中 Rust 安装与测试完整记录 1. 运行环境 系统:Ubuntu 24.04 LTS (WSL2)架构:x86_64 (GNU/Linux)Rust 版本:rustc 1.87.0 (2025-05-09)Cargo 版本:cargo 1.87.0 (2025-05-06) 2. 安装 Rust 2.1 使用 Rust 官方安…...
AI+无人机如何守护濒危物种?YOLOv8实现95%精准识别
【导读】 野生动物监测在理解和保护生态系统中发挥着至关重要的作用。然而,传统的野生动物观察方法往往耗时耗力、成本高昂且范围有限。无人机的出现为野生动物监测提供了有前景的替代方案,能够实现大范围覆盖并远程采集数据。尽管具备这些优势…...
vulnyx Blogger writeup
信息收集 arp-scan nmap 获取userFlag 上web看看 一个默认的页面,gobuster扫一下目录 可以看到扫出的目录中得到了一个有价值的目录/wordpress,说明目标所使用的cms是wordpress,访问http://192.168.43.213/wordpress/然后查看源码能看到 这…...
【JVM】Java虚拟机(二)——垃圾回收
目录 一、如何判断对象可以回收 (一)引用计数法 (二)可达性分析算法 二、垃圾回收算法 (一)标记清除 (二)标记整理 (三)复制 (四ÿ…...
Python+ZeroMQ实战:智能车辆状态监控与模拟模式自动切换
目录 关键点 技术实现1 技术实现2 摘要: 本文将介绍如何利用Python和ZeroMQ消息队列构建一个智能车辆状态监控系统。系统能够根据时间策略自动切换驾驶模式(自动驾驶、人工驾驶、远程驾驶、主动安全),并通过实时消息推送更新车…...
MySQL 部分重点知识篇
一、数据库对象 1. 主键 定义 :主键是用于唯一标识表中每一行记录的字段或字段组合。它具有唯一性和非空性特点。 作用 :确保数据的完整性,便于数据的查询和管理。 示例 :在学生信息表中,学号可以作为主键ÿ…...
【 java 虚拟机知识 第一篇 】
目录 1.内存模型 1.1.JVM内存模型的介绍 1.2.堆和栈的区别 1.3.栈的存储细节 1.4.堆的部分 1.5.程序计数器的作用 1.6.方法区的内容 1.7.字符串池 1.8.引用类型 1.9.内存泄漏与内存溢出 1.10.会出现内存溢出的结构 1.内存模型 1.1.JVM内存模型的介绍 内存模型主要分…...
