Kafka SASL_SSL双重认证
文章目录
- 1. 背景
- 2. 环境
- 3. 操作步骤
- 3.1 生成SSL证书
- 3.2 配置zookeeper认证
- 3.3 配置kafka安全认证
- 3.4 使用kafka客户端进行验证
- 3.5 使用Java端代码进行认证
1. 背景
kafka提供了多种安全认证机制,主要分为SASL和SSL两大类。
- SASL: 是一种身份验证机制,用于在客户端和服务器之间进行身份验证的过程,其中SASL/PLAIN是基于账号密码的认证方式。
- SSL: 是一种加密协议,用于在网络通信中提供数据的保密性和完整性。它使用公钥和私钥来建立安全的连接,并对传输的数据进行加密和解密,以防止未经授权的访问和篡改。
在 Kafka 中启用 SASL_SSL 安全协议时,SASL 用于客户端和服务器之间的身份验证,SSL 则用于加密和保护数据的传输。不仅提供身份验证,还提供加密和数据保护的功能。
因工作需要,需要在测试环境搭建一套基于SASL_SSL协议的kafka环境。坑比较多,经过两天的研究终于搞定了,特在此记录下。
2. 环境
- 操作系统:linux
- kafka版本:kafka_2.13-2.7.1
- zookeeper版本:apache-zookeeper-3.7.0
- 应用程序版本:spring-boot-2.6.7、JDK1.8
3. 操作步骤
- 生成SSL证书
- 配置zookeeper
- 配置kafka
- 前三步配置完成后kafka就开启了SASL_SSL双重认证,可以使用kafka自带的客户端进行测试(3.4),
- 在业务代码中使用请查看(3.5)
3.1 生成SSL证书
按照步骤一步一步操作,生成服务器/客户端的SSL证书。也就是公钥与私钥
参考:【SSL协议】生成SSL证书 - lihewei - 博客园 (cnblogs.com)
3.2 配置zookeeper认证
第一步: 在apache-zookeeper-3.7.0/conf 目录下创建 kafka_zk_jaas.conf 配置文件(名称任意),定义了两个用户,可提供给生产者和消费者使用,格式为:user_用户名=“用户密码”,内容如下:
Server {org.apache.zookeeper.server.auth.DigestLoginModule requireduser_admin="1qaz@WSX"user_kafka="1qaz@WSX";
};
第二步: zookeeper配置文件zoo.cfg中新增SASL认证配置,如下:
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
第三步: 在apache-zookeeper-3.7.0/bin/zkServer.sh脚本中新增jvm参数,让其启动时加载jaas配置文件
export SERVER_JVMFLAGS="-Xmx${ZK_SERVER_HEAP}m $SERVER_JVMFLAGS -Djava.security.auth.login.config=/home/crbt/local/apache-zookeeper-3.7.0/conf/kafka_zk_jaas.conf"

3.3 配置kafka安全认证
第一步: /home/crbt/local/kafka_2.13-2.7.1/config目录下创建kafka-server-jaas.conf和kafka-client-jaas.conf配置文件,内容如下:
kafka-server-jaas.conf
KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="1qaz@WSX"user_admin="1qaz@WSX"user_kafka="1qaz@WSX";
};Client {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="kafka"password="1qaz@WSX";
};
kafka-client-jaas.conf
KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="kafka"password="1qaz@WSX";
};
第二步: 在kafka启动脚本(kafka_2.13-2.7.1/bin/kafka-server-start.sh)配置环境变量,指定jaas.conf文件,增加如下代码:
增加环境变量: -Djava.security.auth.login.config=/home/crbt/local/kafka_2.13-2.7.1/config/kafka-server-jaas.conf
...if [ "x$KAFKA_HEAP_OPTS" = "x" ]; thenexport KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/home/crbt/local/kafka_2.13-2.7.1/config/kafka-server-jaas.conf"
fi...
**第三步:**修改 kafka 的 server.properties配置文件
#listeners=SSL://10.1.61.121:9092
host.name=node1
#listeners=PLAINTEXT://node1:9092,SSL://node1:9093
listeners=SASL_SSL://node1:9093
#advertised.listeners=SSL://node1:9092
advertised.listeners=SASL_SSL://node1:9093
ssl.keystore.location=/home/crbt/lihw/ca/server/server.keystore.jks
ssl.keystore.password=Q06688
ssl.key.password=Q06688
ssl.truststore.location=/home/crbt/lihw/ca/trust/server.truststore.jks
ssl.truststore.password=Q06688
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS
ssl.truststore.type=JKS
# kafka2.0.x开始,将ssl.endpoint.identification.algorithm设置为了HTTPS,即:需要验证主机名
# 如果不需要验证主机名,那么可以这么设置 ssl.endpoint.identification.algorithm=即可
ssl.endpoint.identification.algorithm=
# 设置内部访问也用SSL,默认值为security.inter.broker.protocol=PLAINTEXT
security.inter.broker.protocol=SASL_SSL
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true
注意:这里有个坑,生成SSL密钥私钥时指定了主机的hostname,这里也要配置kafka所在服务器的hostname
3.4 使用kafka客户端进行验证
第一步: 修改kafka/config/下的 consumer.properties、producer.properties,配置SASL_SSL验证的基本信息。
consumer.properties:
bootstrap.servers=node1:9093
security.protocol=SASL_SSL
ssl.truststore.location=/home/crbt/lihw/ca/trust/server.truststore.jks
ssl.truststore.password=Q06688sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="1qaz@WSX";
producer.properties:
bootstrap.servers=node1:9093
security.protocol=SASL_SSL
ssl.truststore.location=/home/crbt/lihw/ca/trust/server.truststore.jks
ssl.truststore.password=Q06688sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="1qaz@WSX";
第二步: 使用命令行操作时,让其找到上述设置的SASL_SSL配置文件( --producer.config …/config/producer.properties)
#生产
crbt@node1:/home/crbt/local/kafka_2.13-2.7.1/bin>./kafka-console-producer.sh --bootstrap-server node1:9093 --topic first --producer.config ../config/producer.properties
>aaa
>bbb
>ccc
>#消费
crbt@node1:/home/crbt/local/kafka_2.13-2.7.1/bin>./kafka-console-consumer.sh --bootstrap-server node1:9093 --topic first -consumer.config /home/crbt/local/kafka_2.13-2.7.1/config/consumer.properties
aaa
bbb
ccc
3.5 使用Java端代码进行认证
第一步: yaml 配置文件
spring:kafka:bootstrap-servers: localhost:9093properties:sasl:mechanism: PLAINjaas:#此处填写 SASL登录时分配的用户名密码(注意password结尾;)config: org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="1qaz@WSX";security:protocol: SASL_SSLssl:trust-store-location: /home/crbt/lihw/ca/trust/server.truststore.jkstrust-store-password: Q06688key-store-type: JKSproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerbatch-size: 106384acks: -1retries: 3properties:linger-ms: 1retry.backoff.ms: 1000buffer-memory: 33554432
第二步: 使用 kafkaTemplate 的方式,配置一个 config
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;@Slf4j
@Configuration
public class KafkaProducerConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.producer.acks}")private String acks;@Value("${spring.kafka.producer.retries}")private String retries;@Value("${spring.kafka.producer.batch-size}")private String batchSize;@Value("${spring.kafka.producer.properties.linger-ms}")private int lingerMs;@Value("${spring.kafka.producer.properties.buffer-memory}")private int bufferMemory;@Value("${spring.kafka.producer.key-serializer}")private String keySerializer;@Value("${spring.kafka.producer.value-serializer}")private String valueSerializer;@Value("${spring.kafka.properties.security.protocol}")private String protocol;@Value("${spring.kafka.properties.sasl.mechanism}")private String mechanism;@Value("${spring.kafka.ssl.trust-store-location}")private String trustStoreLocation;@Value("${spring.kafka.ssl.trust-store-password}")private String trustStorePassword;@Value("${spring.kafka.ssl.key-store-type}")private String keyStoreType;@Value("${spring.kafka.properties.sasl.jaas.config}")private String jaasConfig;@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}/*** the producer factory config*/@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> props = new HashMap<String, Object>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.ACKS_CONFIG, acks);props.put(ProducerConfig.RETRIES_CONFIG, retries);props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);props.put("security.protocol", protocol);props.put(SaslConfigs.SASL_MECHANISM, mechanism);props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreLocation);props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassword);props.put(SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, keyStoreType);props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);return new DefaultKafkaProducerFactory<String, String>(props);}
}
相关文章:
Kafka SASL_SSL双重认证
文章目录 1. 背景2. 环境3. 操作步骤3.1 生成SSL证书3.2 配置zookeeper认证3.3 配置kafka安全认证3.4 使用kafka客户端进行验证3.5 使用Java端代码进行认证 1. 背景 kafka提供了多种安全认证机制,主要分为SASL和SSL两大类。 SASL: 是一种身份验证机制&…...
css新手教程
css新手教程 课程:14、盒子模型及边框使用_哔哩哔哩_bilibili 一.什么是CSS 1.什么是CSS Cascading Style Sheet 层叠样式表。 CSS:表现(美化网页) 字体,颜色,边距,高度,宽度&am…...
spring boot(2.4.x之前版本)和spring cloud项目中配置文件的作用
spring 版本以及相关的组件一直在变化,其中一些类或者功能在低版本中有,高版本中去掉了,有的新功能只在高版本有。 为了防止理解问题,pom.xml 版本依赖如下 <parent><groupId>org.springframework.boot</groupId…...
web前后端小坑记录
游戏服务器过年这段时间忙完了,好久没看web了,重温一下。发现竟然没有文章记录这些修BUG的过程,记录一下。 目录 如何处理F5刷新? 如何处理F5刷新? 后端应该发现路由不存在,直接返回打包好的index.html就…...
股票K线简介
股票K线(K-Line)是用于表示股票价格走势的图形,主要由四个关键价格点组成:开盘价、收盘价、最高价和最低价。K线图广泛应用于股票市场技术分析中,它提供了丰富的信息,帮助分析师和投资者理解市场的行情走势…...
路由器、路由器的构成、交换结构
目录 1 路由器 1.1 路由器的结构 “转发”和“路由选择”的区别 1.1.1 输入端口对线路上收到的分组的处理 1.1.2 输出端口将交换结构传送来的分组发送到线路 2.2 交换结构 2.2.1 通过存储器 2.2.2 通过总线 2.2.3 通过纵横交换结构 (crossbar switch fabric) 1 路由器…...
【Mysql】整理
Mysql整理与总结 整理Mysql的基本内容供回顾。 参考: [1]. 掘金.MySQL三大日志(binlog,redolog,undolog)详解 [2]. Javaguide.MySQL三大日志(binlog、redo log和undo log)详解...
项目02《游戏-08-开发》Unity3D
基于 项目02《游戏-07-开发》Unity3D , 本次任务做物品相互与详情的功能, 首先要做 点击相应, 接下来用接口实现点击相应事件,具体到代码中,我们找到需要响应鼠标事件的对象, 双击PackageCell…...
【数据库原理及应用】简答题归纳总结
第一章 数据库概论 1.人工管理阶段数据管理的特点: (1)数据不保存在机器中 (2)无专用的软件对数据进行管理 (3)只有程序的概念,没有文件的概念 (4)数据面向程…...
通过无线打通两个路由器
通过无线打通两个路由器 上网向导无线连接 配置比较简单,有些路由器支持有些不支持,支持的大致就是下面的方法,不过不同型号面板不一样,这里主要学习方法,所以不做路由器型号介绍。 重要的事情说三遍:学习要…...
idea 配置文件,中文出现乱码如何解决
在进行 spring 项目开发时,项目中有 application.properties/application.yml 等配置文件,在配置文件中使用中文注解时可能会出现乱码的情况,如下: 这是因为 idea 配置文件的编码和其他文件的不同,我们需要修改配置文件…...
网络协议梳理
1 引言 在计算机网络中要做到有条不紊地交换数据,就必须遵守一些事先约定好的规则。这些规则明确规定了所交换的数据的格式以及有关的同步问题。这里所说的同步不是狭义的(即同频或同频同相)而是广义的,即在一定的条件下应当发生什…...
14. 【Linux教程】文件压缩与解压
文件压缩与解压 前面小节介绍了如何对文件和目录删除、移动操作,本小节介绍如何使用命令对文件和目录进行压缩与解压操作,常见的压缩包格式有 .bz2、.Z、.gz、.zip、.xz,压缩之后的文件或目录占用更少的空间。 1. tar 命令介绍 下面列举 ta…...
ruoyi-nbcio中xxl-job的安装与使用
更多ruoyi-nbcio功能请看演示系统 gitee源代码地址 前后端代码: https://gitee.com/nbacheng/ruoyi-nbcio 演示地址: http://122.227.135.243:9666 更多nbcio-boot功能请看演示系统 gitee源代码地址 后端代码: https://gitee.com/nbach…...
从零学算法162
162.峰值元素是指其值严格大于左右相邻值的元素。 给你一个整数数组 nums,找到峰值元素并返回其索引。数组可能包含多个峰值,在这种情况下,返回 任何一个峰值 所在位置即可。 你可以假设 nums[-1] nums[n] -∞ 。 你必须实现时间复杂度为 O…...
5.0 ZooKeeper 数据模型 znode 结构详解
数据模型 在 zookeeper 中,可以说 zookeeper 中的所有存储的数据是由 znode 组成的,节点也称为 znode,并以 key/value 形式存储数据。 整体结构类似于 linux 文件系统的模式以树形结构存储。其中根路径以 / 开头。 进入 zookeeper 安装的 …...
《数电》理论笔记-第1章-逻辑代数基础
参考:视频 和 《数字电路与逻辑设计》 电子书 一,第1章 逻辑代数基础 1 数字量和模拟量 略 2 数制(十进制,二进制,八进制和十六进制) 拨电话(BoDH)---(2八10十六&…...
计算指定路径下的可用空间大小
方法一、使用psutil库 import psutildef check_disk_space(path):usage psutil.disk_usage(path)## 1GB 1 * 1024 * 1024 * 1024字节if usage.free > 1 * 1024 * 1024 * 1024:return 1else:return 0disk_path "/home" result check_disk_space(disk_path) pr…...
2023年全球软件架构师峰会(ArchSummit上海站):核心内容与学习收获(附大会核心PPT下载)
微服务架构是当今软件架构的主流趋势之一。随着云计算和分布式系统的普及,越来越多的企业开始采用微服务架构来构建他们的应用。微服务架构可以将一个大型的应用拆分成多个小型的服务,每个服务都独立部署、独立运行,并通过轻量级的通信协议进…...
踩坑实录(Second Day)
作为公司的小菜鸟,每天都踩坑应该是一件很正常的事情吧,哈哈哈。今天遇到了比较棘手的问题,以前从来没有遇到过。然后就是在某平台上接的一个 bug 修改的单子,也拿出来和大家分享一下~ 此为第二篇(2024 年 02 月 05 日…...
Linux链表操作全解析
Linux C语言链表深度解析与实战技巧 一、链表基础概念与内核链表优势1.1 为什么使用链表?1.2 Linux 内核链表与用户态链表的区别 二、内核链表结构与宏解析常用宏/函数 三、内核链表的优点四、用户态链表示例五、双向循环链表在内核中的实现优势5.1 插入效率5.2 安全…...
鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个生活电费的缴纳和查询小程序
一、项目初始化与配置 1. 创建项目 ohpm init harmony/utility-payment-app 2. 配置权限 // module.json5 {"requestPermissions": [{"name": "ohos.permission.INTERNET"},{"name": "ohos.permission.GET_NETWORK_INFO"…...
Java编程之桥接模式
定义 桥接模式(Bridge Pattern)属于结构型设计模式,它的核心意图是将抽象部分与实现部分分离,使它们可以独立地变化。这种模式通过组合关系来替代继承关系,从而降低了抽象和实现这两个可变维度之间的耦合度。 用例子…...
Windows安装Miniconda
一、下载 https://www.anaconda.com/download/success 二、安装 三、配置镜像源 Anaconda/Miniconda pip 配置清华镜像源_anaconda配置清华源-CSDN博客 四、常用操作命令 Anaconda/Miniconda 基本操作命令_miniconda创建环境命令-CSDN博客...
Git常用命令完全指南:从入门到精通
Git常用命令完全指南:从入门到精通 一、基础配置命令 1. 用户信息配置 # 设置全局用户名 git config --global user.name "你的名字"# 设置全局邮箱 git config --global user.email "你的邮箱example.com"# 查看所有配置 git config --list…...
抽象类和接口(全)
一、抽象类 1.概念:如果⼀个类中没有包含⾜够的信息来描绘⼀个具体的对象,这样的类就是抽象类。 像是没有实际⼯作的⽅法,我们可以把它设计成⼀个抽象⽅法,包含抽象⽅法的类我们称为抽象类。 2.语法 在Java中,⼀个类如果被 abs…...
Vue 实例的数据对象详解
Vue 实例的数据对象详解 在 Vue 中,数据对象是响应式系统的核心,也是组件状态的载体。理解数据对象的原理和使用方式是成为 Vue 专家的关键一步。我将从多个维度深入剖析 Vue 实例的数据对象。 一、数据对象的定义方式 1. Options API 中的定义 在 Options API 中,使用 …...
【Zephyr 系列 16】构建 BLE + LoRa 协同通信系统:网关转发与混合调度实战
🧠关键词:Zephyr、BLE、LoRa、混合通信、事件驱动、网关中继、低功耗调度 📌面向读者:希望将 BLE 和 LoRa 结合应用于资产追踪、环境监测、远程数据采集等场景的开发者 📊篇幅预计:5300+ 字 🧭 背景与需求 在许多 IoT 项目中,单一通信方式往往难以兼顾近场数据采集…...
若依项目部署--传统架构--未完待续
若依项目介绍 项目源码获取 #Git工具下载 dnf -y install git #若依项目获取 git clone https://gitee.com/y_project/RuoYi-Vue.git项目背景 随着企业信息化需求的增加,传统开发模式存在效率低,重复劳动多等问题。若依项目通过整合主流技术框架&…...
基于django+vue的健身房管理系统-vue
开发语言:Python框架:djangoPython版本:python3.8数据库:mysql 5.7数据库工具:Navicat12开发软件:PyCharm 系统展示 会员信息管理 员工信息管理 会员卡类型管理 健身项目管理 会员卡管理 摘要 健身房管理…...
