Kafka SASL_SSL双重认证
文章目录
- 1. 背景
- 2. 环境
- 3. 操作步骤
- 3.1 生成SSL证书
- 3.2 配置zookeeper认证
- 3.3 配置kafka安全认证
- 3.4 使用kafka客户端进行验证
- 3.5 使用Java端代码进行认证
1. 背景
kafka提供了多种安全认证机制,主要分为SASL和SSL两大类。
- SASL: 是一种身份验证机制,用于在客户端和服务器之间进行身份验证的过程,其中SASL/PLAIN是基于账号密码的认证方式。
- SSL: 是一种加密协议,用于在网络通信中提供数据的保密性和完整性。它使用公钥和私钥来建立安全的连接,并对传输的数据进行加密和解密,以防止未经授权的访问和篡改。
在 Kafka 中启用 SASL_SSL 安全协议时,SASL 用于客户端和服务器之间的身份验证,SSL 则用于加密和保护数据的传输。不仅提供身份验证,还提供加密和数据保护的功能。
因工作需要,需要在测试环境搭建一套基于SASL_SSL协议的kafka环境。坑比较多,经过两天的研究终于搞定了,特在此记录下。
2. 环境
- 操作系统:linux
- kafka版本:kafka_2.13-2.7.1
- zookeeper版本:apache-zookeeper-3.7.0
- 应用程序版本:spring-boot-2.6.7、JDK1.8
3. 操作步骤
- 生成SSL证书
- 配置zookeeper
- 配置kafka
- 前三步配置完成后kafka就开启了SASL_SSL双重认证,可以使用kafka自带的客户端进行测试(3.4),
- 在业务代码中使用请查看(3.5)
3.1 生成SSL证书
按照步骤一步一步操作,生成服务器/客户端的SSL证书。也就是公钥与私钥
参考:【SSL协议】生成SSL证书 - lihewei - 博客园 (cnblogs.com)
3.2 配置zookeeper认证
第一步: 在apache-zookeeper-3.7.0/conf 目录下创建 kafka_zk_jaas.conf 配置文件(名称任意),定义了两个用户,可提供给生产者和消费者使用,格式为:user_用户名=“用户密码”,内容如下:
Server {org.apache.zookeeper.server.auth.DigestLoginModule requireduser_admin="1qaz@WSX"user_kafka="1qaz@WSX";
};
第二步: zookeeper配置文件zoo.cfg中新增SASL认证配置,如下:
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
第三步: 在apache-zookeeper-3.7.0/bin/zkServer.sh脚本中新增jvm参数,让其启动时加载jaas配置文件
export SERVER_JVMFLAGS="-Xmx${ZK_SERVER_HEAP}m $SERVER_JVMFLAGS -Djava.security.auth.login.config=/home/crbt/local/apache-zookeeper-3.7.0/conf/kafka_zk_jaas.conf"
3.3 配置kafka安全认证
第一步: /home/crbt/local/kafka_2.13-2.7.1/config目录下创建kafka-server-jaas.conf和kafka-client-jaas.conf配置文件,内容如下:
kafka-server-jaas.conf
KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="1qaz@WSX"user_admin="1qaz@WSX"user_kafka="1qaz@WSX";
};Client {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="kafka"password="1qaz@WSX";
};
kafka-client-jaas.conf
KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="kafka"password="1qaz@WSX";
};
第二步: 在kafka启动脚本(kafka_2.13-2.7.1/bin/kafka-server-start.sh)配置环境变量,指定jaas.conf文件,增加如下代码:
增加环境变量: -Djava.security.auth.login.config=/home/crbt/local/kafka_2.13-2.7.1/config/kafka-server-jaas.conf
...if [ "x$KAFKA_HEAP_OPTS" = "x" ]; thenexport KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/home/crbt/local/kafka_2.13-2.7.1/config/kafka-server-jaas.conf"
fi...
**第三步:**修改 kafka 的 server.properties配置文件
#listeners=SSL://10.1.61.121:9092
host.name=node1
#listeners=PLAINTEXT://node1:9092,SSL://node1:9093
listeners=SASL_SSL://node1:9093
#advertised.listeners=SSL://node1:9092
advertised.listeners=SASL_SSL://node1:9093
ssl.keystore.location=/home/crbt/lihw/ca/server/server.keystore.jks
ssl.keystore.password=Q06688
ssl.key.password=Q06688
ssl.truststore.location=/home/crbt/lihw/ca/trust/server.truststore.jks
ssl.truststore.password=Q06688
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS
ssl.truststore.type=JKS
# kafka2.0.x开始,将ssl.endpoint.identification.algorithm设置为了HTTPS,即:需要验证主机名
# 如果不需要验证主机名,那么可以这么设置 ssl.endpoint.identification.algorithm=即可
ssl.endpoint.identification.algorithm=
# 设置内部访问也用SSL,默认值为security.inter.broker.protocol=PLAINTEXT
security.inter.broker.protocol=SASL_SSL
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true
注意:这里有个坑,生成SSL密钥私钥时指定了主机的hostname,这里也要配置kafka所在服务器的hostname
3.4 使用kafka客户端进行验证
第一步: 修改kafka/config/下的 consumer.properties、producer.properties,配置SASL_SSL验证的基本信息。
consumer.properties:
bootstrap.servers=node1:9093
security.protocol=SASL_SSL
ssl.truststore.location=/home/crbt/lihw/ca/trust/server.truststore.jks
ssl.truststore.password=Q06688sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="1qaz@WSX";
producer.properties:
bootstrap.servers=node1:9093
security.protocol=SASL_SSL
ssl.truststore.location=/home/crbt/lihw/ca/trust/server.truststore.jks
ssl.truststore.password=Q06688sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="1qaz@WSX";
第二步: 使用命令行操作时,让其找到上述设置的SASL_SSL配置文件( --producer.config …/config/producer.properties)
#生产
crbt@node1:/home/crbt/local/kafka_2.13-2.7.1/bin>./kafka-console-producer.sh --bootstrap-server node1:9093 --topic first --producer.config ../config/producer.properties
>aaa
>bbb
>ccc
>#消费
crbt@node1:/home/crbt/local/kafka_2.13-2.7.1/bin>./kafka-console-consumer.sh --bootstrap-server node1:9093 --topic first -consumer.config /home/crbt/local/kafka_2.13-2.7.1/config/consumer.properties
aaa
bbb
ccc
3.5 使用Java端代码进行认证
第一步: yaml 配置文件
spring:kafka:bootstrap-servers: localhost:9093properties:sasl:mechanism: PLAINjaas:#此处填写 SASL登录时分配的用户名密码(注意password结尾;)config: org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="1qaz@WSX";security:protocol: SASL_SSLssl:trust-store-location: /home/crbt/lihw/ca/trust/server.truststore.jkstrust-store-password: Q06688key-store-type: JKSproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerbatch-size: 106384acks: -1retries: 3properties:linger-ms: 1retry.backoff.ms: 1000buffer-memory: 33554432
第二步: 使用 kafkaTemplate 的方式,配置一个 config
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;@Slf4j
@Configuration
public class KafkaProducerConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.producer.acks}")private String acks;@Value("${spring.kafka.producer.retries}")private String retries;@Value("${spring.kafka.producer.batch-size}")private String batchSize;@Value("${spring.kafka.producer.properties.linger-ms}")private int lingerMs;@Value("${spring.kafka.producer.properties.buffer-memory}")private int bufferMemory;@Value("${spring.kafka.producer.key-serializer}")private String keySerializer;@Value("${spring.kafka.producer.value-serializer}")private String valueSerializer;@Value("${spring.kafka.properties.security.protocol}")private String protocol;@Value("${spring.kafka.properties.sasl.mechanism}")private String mechanism;@Value("${spring.kafka.ssl.trust-store-location}")private String trustStoreLocation;@Value("${spring.kafka.ssl.trust-store-password}")private String trustStorePassword;@Value("${spring.kafka.ssl.key-store-type}")private String keyStoreType;@Value("${spring.kafka.properties.sasl.jaas.config}")private String jaasConfig;@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}/*** the producer factory config*/@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> props = new HashMap<String, Object>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.ACKS_CONFIG, acks);props.put(ProducerConfig.RETRIES_CONFIG, retries);props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);props.put("security.protocol", protocol);props.put(SaslConfigs.SASL_MECHANISM, mechanism);props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreLocation);props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassword);props.put(SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, keyStoreType);props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);return new DefaultKafkaProducerFactory<String, String>(props);}
}
相关文章:

Kafka SASL_SSL双重认证
文章目录 1. 背景2. 环境3. 操作步骤3.1 生成SSL证书3.2 配置zookeeper认证3.3 配置kafka安全认证3.4 使用kafka客户端进行验证3.5 使用Java端代码进行认证 1. 背景 kafka提供了多种安全认证机制,主要分为SASL和SSL两大类。 SASL: 是一种身份验证机制&…...

css新手教程
css新手教程 课程:14、盒子模型及边框使用_哔哩哔哩_bilibili 一.什么是CSS 1.什么是CSS Cascading Style Sheet 层叠样式表。 CSS:表现(美化网页) 字体,颜色,边距,高度,宽度&am…...

spring boot(2.4.x之前版本)和spring cloud项目中配置文件的作用
spring 版本以及相关的组件一直在变化,其中一些类或者功能在低版本中有,高版本中去掉了,有的新功能只在高版本有。 为了防止理解问题,pom.xml 版本依赖如下 <parent><groupId>org.springframework.boot</groupId…...

web前后端小坑记录
游戏服务器过年这段时间忙完了,好久没看web了,重温一下。发现竟然没有文章记录这些修BUG的过程,记录一下。 目录 如何处理F5刷新? 如何处理F5刷新? 后端应该发现路由不存在,直接返回打包好的index.html就…...

股票K线简介
股票K线(K-Line)是用于表示股票价格走势的图形,主要由四个关键价格点组成:开盘价、收盘价、最高价和最低价。K线图广泛应用于股票市场技术分析中,它提供了丰富的信息,帮助分析师和投资者理解市场的行情走势…...

路由器、路由器的构成、交换结构
目录 1 路由器 1.1 路由器的结构 “转发”和“路由选择”的区别 1.1.1 输入端口对线路上收到的分组的处理 1.1.2 输出端口将交换结构传送来的分组发送到线路 2.2 交换结构 2.2.1 通过存储器 2.2.2 通过总线 2.2.3 通过纵横交换结构 (crossbar switch fabric) 1 路由器…...

【Mysql】整理
Mysql整理与总结 整理Mysql的基本内容供回顾。 参考: [1]. 掘金.MySQL三大日志(binlog,redolog,undolog)详解 [2]. Javaguide.MySQL三大日志(binlog、redo log和undo log)详解...

项目02《游戏-08-开发》Unity3D
基于 项目02《游戏-07-开发》Unity3D , 本次任务做物品相互与详情的功能, 首先要做 点击相应, 接下来用接口实现点击相应事件,具体到代码中,我们找到需要响应鼠标事件的对象, 双击PackageCell…...

【数据库原理及应用】简答题归纳总结
第一章 数据库概论 1.人工管理阶段数据管理的特点: (1)数据不保存在机器中 (2)无专用的软件对数据进行管理 (3)只有程序的概念,没有文件的概念 (4)数据面向程…...

通过无线打通两个路由器
通过无线打通两个路由器 上网向导无线连接 配置比较简单,有些路由器支持有些不支持,支持的大致就是下面的方法,不过不同型号面板不一样,这里主要学习方法,所以不做路由器型号介绍。 重要的事情说三遍:学习要…...

idea 配置文件,中文出现乱码如何解决
在进行 spring 项目开发时,项目中有 application.properties/application.yml 等配置文件,在配置文件中使用中文注解时可能会出现乱码的情况,如下: 这是因为 idea 配置文件的编码和其他文件的不同,我们需要修改配置文件…...

网络协议梳理
1 引言 在计算机网络中要做到有条不紊地交换数据,就必须遵守一些事先约定好的规则。这些规则明确规定了所交换的数据的格式以及有关的同步问题。这里所说的同步不是狭义的(即同频或同频同相)而是广义的,即在一定的条件下应当发生什…...

14. 【Linux教程】文件压缩与解压
文件压缩与解压 前面小节介绍了如何对文件和目录删除、移动操作,本小节介绍如何使用命令对文件和目录进行压缩与解压操作,常见的压缩包格式有 .bz2、.Z、.gz、.zip、.xz,压缩之后的文件或目录占用更少的空间。 1. tar 命令介绍 下面列举 ta…...

ruoyi-nbcio中xxl-job的安装与使用
更多ruoyi-nbcio功能请看演示系统 gitee源代码地址 前后端代码: https://gitee.com/nbacheng/ruoyi-nbcio 演示地址: http://122.227.135.243:9666 更多nbcio-boot功能请看演示系统 gitee源代码地址 后端代码: https://gitee.com/nbach…...
从零学算法162
162.峰值元素是指其值严格大于左右相邻值的元素。 给你一个整数数组 nums,找到峰值元素并返回其索引。数组可能包含多个峰值,在这种情况下,返回 任何一个峰值 所在位置即可。 你可以假设 nums[-1] nums[n] -∞ 。 你必须实现时间复杂度为 O…...

5.0 ZooKeeper 数据模型 znode 结构详解
数据模型 在 zookeeper 中,可以说 zookeeper 中的所有存储的数据是由 znode 组成的,节点也称为 znode,并以 key/value 形式存储数据。 整体结构类似于 linux 文件系统的模式以树形结构存储。其中根路径以 / 开头。 进入 zookeeper 安装的 …...

《数电》理论笔记-第1章-逻辑代数基础
参考:视频 和 《数字电路与逻辑设计》 电子书 一,第1章 逻辑代数基础 1 数字量和模拟量 略 2 数制(十进制,二进制,八进制和十六进制) 拨电话(BoDH)---(2八10十六&…...
计算指定路径下的可用空间大小
方法一、使用psutil库 import psutildef check_disk_space(path):usage psutil.disk_usage(path)## 1GB 1 * 1024 * 1024 * 1024字节if usage.free > 1 * 1024 * 1024 * 1024:return 1else:return 0disk_path "/home" result check_disk_space(disk_path) pr…...

2023年全球软件架构师峰会(ArchSummit上海站):核心内容与学习收获(附大会核心PPT下载)
微服务架构是当今软件架构的主流趋势之一。随着云计算和分布式系统的普及,越来越多的企业开始采用微服务架构来构建他们的应用。微服务架构可以将一个大型的应用拆分成多个小型的服务,每个服务都独立部署、独立运行,并通过轻量级的通信协议进…...
踩坑实录(Second Day)
作为公司的小菜鸟,每天都踩坑应该是一件很正常的事情吧,哈哈哈。今天遇到了比较棘手的问题,以前从来没有遇到过。然后就是在某平台上接的一个 bug 修改的单子,也拿出来和大家分享一下~ 此为第二篇(2024 年 02 月 05 日…...
synchronized 学习
学习源: https://www.bilibili.com/video/BV1aJ411V763?spm_id_from333.788.videopod.episodes&vd_source32e1c41a9370911ab06d12fbc36c4ebc 1.应用场景 不超卖,也要考虑性能问题(场景) 2.常见面试问题: sync出…...

Prompt Tuning、P-Tuning、Prefix Tuning的区别
一、Prompt Tuning、P-Tuning、Prefix Tuning的区别 1. Prompt Tuning(提示调优) 核心思想:固定预训练模型参数,仅学习额外的连续提示向量(通常是嵌入层的一部分)。实现方式:在输入文本前添加可训练的连续向量(软提示),模型只更新这些提示参数。优势:参数量少(仅提…...
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする 1、前言(1)情况说明(2)工程师的信仰2、知识点(1) にする1,接续:名词+にする2,接续:疑问词+にする3,(A)は(B)にする。(2)復習:(1)复习句子(2)ために & ように(3)そう(4)にする3、…...
在 Nginx Stream 层“改写”MQTT ngx_stream_mqtt_filter_module
1、为什么要修改 CONNECT 报文? 多租户隔离:自动为接入设备追加租户前缀,后端按 ClientID 拆分队列。零代码鉴权:将入站用户名替换为 OAuth Access-Token,后端 Broker 统一校验。灰度发布:根据 IP/地理位写…...

最新SpringBoot+SpringCloud+Nacos微服务框架分享
文章目录 前言一、服务规划二、架构核心1.cloud的pom2.gateway的异常handler3.gateway的filter4、admin的pom5、admin的登录核心 三、code-helper分享总结 前言 最近有个活蛮赶的,根据Excel列的需求预估的工时直接打骨折,不要问我为什么,主要…...
【论文笔记】若干矿井粉尘检测算法概述
总的来说,传统机器学习、传统机器学习与深度学习的结合、LSTM等算法所需要的数据集来源于矿井传感器测量的粉尘浓度,通过建立回归模型来预测未来矿井的粉尘浓度。传统机器学习算法性能易受数据中极端值的影响。YOLO等计算机视觉算法所需要的数据集来源于…...
spring:实例工厂方法获取bean
spring处理使用静态工厂方法获取bean实例,也可以通过实例工厂方法获取bean实例。 实例工厂方法步骤如下: 定义实例工厂类(Java代码),定义实例工厂(xml),定义调用实例工厂ÿ…...
【Go】3、Go语言进阶与依赖管理
前言 本系列文章参考自稀土掘金上的 【字节内部课】公开课,做自我学习总结整理。 Go语言并发编程 Go语言原生支持并发编程,它的核心机制是 Goroutine 协程、Channel 通道,并基于CSP(Communicating Sequential Processes࿰…...

Mac软件卸载指南,简单易懂!
刚和Adobe分手,它却总在Library里给你写"回忆录"?卸载的Final Cut Pro像电子幽灵般阴魂不散?总是会有残留文件,别慌!这份Mac软件卸载指南,将用最硬核的方式教你"数字分手术"࿰…...

CocosCreator 之 JavaScript/TypeScript和Java的相互交互
引擎版本: 3.8.1 语言: JavaScript/TypeScript、C、Java 环境:Window 参考:Java原生反射机制 您好,我是鹤九日! 回顾 在上篇文章中:CocosCreator Android项目接入UnityAds 广告SDK。 我们简单讲…...