当前位置: 首页 > news >正文

Kafka SASL_SSL双重认证

文章目录

    • 1. 背景
    • 2. 环境
    • 3. 操作步骤
      • 3.1 生成SSL证书
      • 3.2 配置zookeeper认证
      • 3.3 配置kafka安全认证
      • 3.4 使用kafka客户端进行验证
      • 3.5 使用Java端代码进行认证

1. 背景

kafka提供了多种安全认证机制,主要分为SASL和SSL两大类。

  • SASL: 是一种身份验证机制,用于在客户端和服务器之间进行身份验证的过程,其中SASL/PLAIN是基于账号密码的认证方式。
  • SSL: 是一种加密协议,用于在网络通信中提供数据的保密性和完整性。它使用公钥和私钥来建立安全的连接,并对传输的数据进行加密和解密,以防止未经授权的访问和篡改。

在 Kafka 中启用 SASL_SSL 安全协议时,SASL 用于客户端和服务器之间的身份验证,SSL 则用于加密和保护数据的传输。不仅提供身份验证,还提供加密和数据保护的功能。

因工作需要,需要在测试环境搭建一套基于SASL_SSL协议的kafka环境。坑比较多,经过两天的研究终于搞定了,特在此记录下。

2. 环境

  • 操作系统:linux
  • kafka版本:kafka_2.13-2.7.1
  • zookeeper版本:apache-zookeeper-3.7.0
  • 应用程序版本:spring-boot-2.6.7、JDK1.8

3. 操作步骤

  1. 生成SSL证书
  2. 配置zookeeper
  3. 配置kafka
  4. 前三步配置完成后kafka就开启了SASL_SSL双重认证,可以使用kafka自带的客户端进行测试(3.4),
  5. 在业务代码中使用请查看(3.5)

3.1 生成SSL证书

按照步骤一步一步操作,生成服务器/客户端的SSL证书。也就是公钥与私钥

参考:【SSL协议】生成SSL证书 - lihewei - 博客园 (cnblogs.com)

3.2 配置zookeeper认证

第一步: 在apache-zookeeper-3.7.0/conf 目录下创建 kafka_zk_jaas.conf 配置文件(名称任意),定义了两个用户,可提供给生产者和消费者使用,格式为:user_用户名=“用户密码”,内容如下:

Server {org.apache.zookeeper.server.auth.DigestLoginModule requireduser_admin="1qaz@WSX"user_kafka="1qaz@WSX";
};

第二步: zookeeper配置文件zoo.cfg中新增SASL认证配置,如下:

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

第三步: 在apache-zookeeper-3.7.0/bin/zkServer.sh脚本中新增jvm参数,让其启动时加载jaas配置文件

export SERVER_JVMFLAGS="-Xmx${ZK_SERVER_HEAP}m $SERVER_JVMFLAGS -Djava.security.auth.login.config=/home/crbt/local/apache-zookeeper-3.7.0/conf/kafka_zk_jaas.conf"

3.3 配置kafka安全认证

第一步: /home/crbt/local/kafka_2.13-2.7.1/config目录下创建kafka-server-jaas.conf和kafka-client-jaas.conf配置文件,内容如下:

kafka-server-jaas.conf

KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="admin"password="1qaz@WSX"user_admin="1qaz@WSX"user_kafka="1qaz@WSX";
};Client {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="kafka"password="1qaz@WSX";
};

kafka-client-jaas.conf

KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="kafka"password="1qaz@WSX";
};

第二步: 在kafka启动脚本(kafka_2.13-2.7.1/bin/kafka-server-start.sh)配置环境变量,指定jaas.conf文件,增加如下代码:

增加环境变量: -Djava.security.auth.login.config=/home/crbt/local/kafka_2.13-2.7.1/config/kafka-server-jaas.conf

...if [ "x$KAFKA_HEAP_OPTS" = "x" ]; thenexport KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/home/crbt/local/kafka_2.13-2.7.1/config/kafka-server-jaas.conf"
fi...

**第三步:**修改 kafka 的 server.properties配置文件

#listeners=SSL://10.1.61.121:9092
host.name=node1
#listeners=PLAINTEXT://node1:9092,SSL://node1:9093
listeners=SASL_SSL://node1:9093
#advertised.listeners=SSL://node1:9092
advertised.listeners=SASL_SSL://node1:9093
ssl.keystore.location=/home/crbt/lihw/ca/server/server.keystore.jks
ssl.keystore.password=Q06688
ssl.key.password=Q06688
ssl.truststore.location=/home/crbt/lihw/ca/trust/server.truststore.jks
ssl.truststore.password=Q06688
ssl.client.auth=required
ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type=JKS 
ssl.truststore.type=JKS 
# kafka2.0.x开始,将ssl.endpoint.identification.algorithm设置为了HTTPS,即:需要验证主机名
# 如果不需要验证主机名,那么可以这么设置 ssl.endpoint.identification.algorithm=即可
ssl.endpoint.identification.algorithm=
# 设置内部访问也用SSL,默认值为security.inter.broker.protocol=PLAINTEXT
security.inter.broker.protocol=SASL_SSL
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=true

注意:这里有个坑,生成SSL密钥私钥时指定了主机的hostname,这里也要配置kafka所在服务器的hostname

3.4 使用kafka客户端进行验证

第一步: 修改kafka/config/下的 consumer.properties、producer.properties,配置SASL_SSL验证的基本信息。

consumer.properties:

bootstrap.servers=node1:9093
security.protocol=SASL_SSL
ssl.truststore.location=/home/crbt/lihw/ca/trust/server.truststore.jks
ssl.truststore.password=Q06688sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="1qaz@WSX";

producer.properties:

bootstrap.servers=node1:9093
security.protocol=SASL_SSL
ssl.truststore.location=/home/crbt/lihw/ca/trust/server.truststore.jks
ssl.truststore.password=Q06688sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="1qaz@WSX";

第二步: 使用命令行操作时,让其找到上述设置的SASL_SSL配置文件( --producer.config …/config/producer.properties)

#生产
crbt@node1:/home/crbt/local/kafka_2.13-2.7.1/bin>./kafka-console-producer.sh --bootstrap-server node1:9093  --topic first --producer.config ../config/producer.properties
>aaa
>bbb
>ccc
>#消费
crbt@node1:/home/crbt/local/kafka_2.13-2.7.1/bin>./kafka-console-consumer.sh --bootstrap-server node1:9093 --topic first -consumer.config /home/crbt/local/kafka_2.13-2.7.1/config/consumer.properties
aaa
bbb
ccc

3.5 使用Java端代码进行认证

第一步: yaml 配置文件

spring:kafka:bootstrap-servers: localhost:9093properties:sasl:mechanism: PLAINjaas:#此处填写 SASL登录时分配的用户名密码(注意password结尾;)config: org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="1qaz@WSX";security:protocol: SASL_SSLssl:trust-store-location: /home/crbt/lihw/ca/trust/server.truststore.jkstrust-store-password: Q06688key-store-type: JKSproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerbatch-size: 106384acks: -1retries: 3properties:linger-ms: 1retry.backoff.ms: 1000buffer-memory: 33554432

第二步: 使用 kafkaTemplate 的方式,配置一个 config

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;@Slf4j
@Configuration
public class KafkaProducerConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.producer.acks}")private String acks;@Value("${spring.kafka.producer.retries}")private String retries;@Value("${spring.kafka.producer.batch-size}")private String batchSize;@Value("${spring.kafka.producer.properties.linger-ms}")private int lingerMs;@Value("${spring.kafka.producer.properties.buffer-memory}")private int bufferMemory;@Value("${spring.kafka.producer.key-serializer}")private String keySerializer;@Value("${spring.kafka.producer.value-serializer}")private String valueSerializer;@Value("${spring.kafka.properties.security.protocol}")private String protocol;@Value("${spring.kafka.properties.sasl.mechanism}")private String mechanism;@Value("${spring.kafka.ssl.trust-store-location}")private String trustStoreLocation;@Value("${spring.kafka.ssl.trust-store-password}")private String trustStorePassword;@Value("${spring.kafka.ssl.key-store-type}")private String keyStoreType;@Value("${spring.kafka.properties.sasl.jaas.config}")private String jaasConfig;@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}/*** the producer factory config*/@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> props = new HashMap<String, Object>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);props.put(ProducerConfig.ACKS_CONFIG, acks);props.put(ProducerConfig.RETRIES_CONFIG, retries);props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);props.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);props.put("security.protocol", protocol);props.put(SaslConfigs.SASL_MECHANISM, mechanism);props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, trustStoreLocation);props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, trustStorePassword);props.put(SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, keyStoreType);props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasConfig);return new DefaultKafkaProducerFactory<String, String>(props);}
}

相关文章:

Kafka SASL_SSL双重认证

文章目录 1. 背景2. 环境3. 操作步骤3.1 生成SSL证书3.2 配置zookeeper认证3.3 配置kafka安全认证3.4 使用kafka客户端进行验证3.5 使用Java端代码进行认证 1. 背景 kafka提供了多种安全认证机制&#xff0c;主要分为SASL和SSL两大类。 SASL&#xff1a; 是一种身份验证机制&…...

css新手教程

css新手教程 课程&#xff1a;14、盒子模型及边框使用_哔哩哔哩_bilibili 一.什么是CSS 1.什么是CSS Cascading Style Sheet 层叠样式表。 CSS&#xff1a;表现&#xff08;美化网页&#xff09; 字体&#xff0c;颜色&#xff0c;边距&#xff0c;高度&#xff0c;宽度&am…...

spring boot(2.4.x之前版本)和spring cloud项目中配置文件的作用

spring 版本以及相关的组件一直在变化&#xff0c;其中一些类或者功能在低版本中有&#xff0c;高版本中去掉了&#xff0c;有的新功能只在高版本有。 为了防止理解问题&#xff0c;pom.xml 版本依赖如下 <parent><groupId>org.springframework.boot</groupId…...

web前后端小坑记录

游戏服务器过年这段时间忙完了&#xff0c;好久没看web了&#xff0c;重温一下。发现竟然没有文章记录这些修BUG的过程&#xff0c;记录一下。 目录 如何处理F5刷新&#xff1f; 如何处理F5刷新&#xff1f; 后端应该发现路由不存在&#xff0c;直接返回打包好的index.html就…...

股票K线简介

股票K线&#xff08;K-Line&#xff09;是用于表示股票价格走势的图形&#xff0c;主要由四个关键价格点组成&#xff1a;开盘价、收盘价、最高价和最低价。K线图广泛应用于股票市场技术分析中&#xff0c;它提供了丰富的信息&#xff0c;帮助分析师和投资者理解市场的行情走势…...

路由器、路由器的构成、交换结构

目录 1 路由器 1.1 路由器的结构 “转发”和“路由选择”的区别 1.1.1 输入端口对线路上收到的分组的处理 1.1.2 输出端口将交换结构传送来的分组发送到线路 2.2 交换结构 2.2.1 通过存储器 2.2.2 通过总线 2.2.3 通过纵横交换结构 (crossbar switch fabric) 1 路由器…...

【Mysql】整理

Mysql整理与总结 整理Mysql的基本内容供回顾。 参考&#xff1a; [1]. 掘金.MySQL三大日志(binlog,redolog,undolog)详解 [2]. Javaguide.MySQL三大日志(binlog、redo log和undo log)详解...

项目02《游戏-08-开发》Unity3D

基于 项目02《游戏-07-开发》Unity3D &#xff0c; 本次任务做物品相互与详情的功能&#xff0c; 首先要做 点击相应&#xff0c; 接下来用接口实现点击相应事件&#xff0c;具体到代码中&#xff0c;我们找到需要响应鼠标事件的对象&#xff0c; 双击PackageCell…...

【数据库原理及应用】简答题归纳总结

第一章 数据库概论 1.人工管理阶段数据管理的特点&#xff1a; &#xff08;1&#xff09;数据不保存在机器中 &#xff08;2&#xff09;无专用的软件对数据进行管理 &#xff08;3&#xff09;只有程序的概念&#xff0c;没有文件的概念 &#xff08;4&#xff09;数据面向程…...

通过无线打通两个路由器

通过无线打通两个路由器 上网向导无线连接 配置比较简单&#xff0c;有些路由器支持有些不支持&#xff0c;支持的大致就是下面的方法&#xff0c;不过不同型号面板不一样&#xff0c;这里主要学习方法&#xff0c;所以不做路由器型号介绍。 重要的事情说三遍&#xff1a;学习要…...

idea 配置文件,中文出现乱码如何解决

在进行 spring 项目开发时&#xff0c;项目中有 application.properties/application.yml 等配置文件&#xff0c;在配置文件中使用中文注解时可能会出现乱码的情况&#xff0c;如下&#xff1a; 这是因为 idea 配置文件的编码和其他文件的不同&#xff0c;我们需要修改配置文件…...

网络协议梳理

1 引言 在计算机网络中要做到有条不紊地交换数据&#xff0c;就必须遵守一些事先约定好的规则。这些规则明确规定了所交换的数据的格式以及有关的同步问题。这里所说的同步不是狭义的&#xff08;即同频或同频同相&#xff09;而是广义的&#xff0c;即在一定的条件下应当发生什…...

14. 【Linux教程】文件压缩与解压

文件压缩与解压 前面小节介绍了如何对文件和目录删除、移动操作&#xff0c;本小节介绍如何使用命令对文件和目录进行压缩与解压操作&#xff0c;常见的压缩包格式有 .bz2、.Z、.gz、.zip、.xz&#xff0c;压缩之后的文件或目录占用更少的空间。 1. tar 命令介绍 下面列举 ta…...

ruoyi-nbcio中xxl-job的安装与使用

更多ruoyi-nbcio功能请看演示系统 gitee源代码地址 前后端代码&#xff1a; https://gitee.com/nbacheng/ruoyi-nbcio 演示地址&#xff1a; http://122.227.135.243:9666 更多nbcio-boot功能请看演示系统 gitee源代码地址 后端代码&#xff1a; https://gitee.com/nbach…...

从零学算法162

162.峰值元素是指其值严格大于左右相邻值的元素。 给你一个整数数组 nums&#xff0c;找到峰值元素并返回其索引。数组可能包含多个峰值&#xff0c;在这种情况下&#xff0c;返回 任何一个峰值 所在位置即可。 你可以假设 nums[-1] nums[n] -∞ 。 你必须实现时间复杂度为 O…...

5.0 ZooKeeper 数据模型 znode 结构详解

数据模型 在 zookeeper 中&#xff0c;可以说 zookeeper 中的所有存储的数据是由 znode 组成的&#xff0c;节点也称为 znode&#xff0c;并以 key/value 形式存储数据。 整体结构类似于 linux 文件系统的模式以树形结构存储。其中根路径以 / 开头。 进入 zookeeper 安装的 …...

《数电》理论笔记-第1章-逻辑代数基础

参考&#xff1a;视频 和 《数字电路与逻辑设计》 电子书 一&#xff0c;第1章 逻辑代数基础 1 数字量和模拟量 略 2 数制&#xff08;十进制&#xff0c;二进制&#xff0c;八进制和十六进制&#xff09; 拨电话&#xff08;BoDH&#xff09;---&#xff08;2八10十六&…...

计算指定路径下的可用空间大小

方法一、使用psutil库 import psutildef check_disk_space(path):usage psutil.disk_usage(path)## 1GB 1 * 1024 * 1024 * 1024字节if usage.free > 1 * 1024 * 1024 * 1024:return 1else:return 0disk_path "/home" result check_disk_space(disk_path) pr…...

2023年全球软件架构师峰会(ArchSummit上海站):核心内容与学习收获(附大会核心PPT下载)

微服务架构是当今软件架构的主流趋势之一。随着云计算和分布式系统的普及&#xff0c;越来越多的企业开始采用微服务架构来构建他们的应用。微服务架构可以将一个大型的应用拆分成多个小型的服务&#xff0c;每个服务都独立部署、独立运行&#xff0c;并通过轻量级的通信协议进…...

踩坑实录(Second Day)

作为公司的小菜鸟&#xff0c;每天都踩坑应该是一件很正常的事情吧&#xff0c;哈哈哈。今天遇到了比较棘手的问题&#xff0c;以前从来没有遇到过。然后就是在某平台上接的一个 bug 修改的单子&#xff0c;也拿出来和大家分享一下~ 此为第二篇&#xff08;2024 年 02 月 05 日…...

synchronized 学习

学习源&#xff1a; https://www.bilibili.com/video/BV1aJ411V763?spm_id_from333.788.videopod.episodes&vd_source32e1c41a9370911ab06d12fbc36c4ebc 1.应用场景 不超卖&#xff0c;也要考虑性能问题&#xff08;场景&#xff09; 2.常见面试问题&#xff1a; sync出…...

Prompt Tuning、P-Tuning、Prefix Tuning的区别

一、Prompt Tuning、P-Tuning、Prefix Tuning的区别 1. Prompt Tuning(提示调优) 核心思想:固定预训练模型参数,仅学习额外的连续提示向量(通常是嵌入层的一部分)。实现方式:在输入文本前添加可训练的连续向量(软提示),模型只更新这些提示参数。优势:参数量少(仅提…...

日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする

日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする 1、前言(1)情况说明(2)工程师的信仰2、知识点(1) にする1,接续:名词+にする2,接续:疑问词+にする3,(A)は(B)にする。(2)復習:(1)复习句子(2)ために & ように(3)そう(4)にする3、…...

在 Nginx Stream 层“改写”MQTT ngx_stream_mqtt_filter_module

1、为什么要修改 CONNECT 报文&#xff1f; 多租户隔离&#xff1a;自动为接入设备追加租户前缀&#xff0c;后端按 ClientID 拆分队列。零代码鉴权&#xff1a;将入站用户名替换为 OAuth Access-Token&#xff0c;后端 Broker 统一校验。灰度发布&#xff1a;根据 IP/地理位写…...

最新SpringBoot+SpringCloud+Nacos微服务框架分享

文章目录 前言一、服务规划二、架构核心1.cloud的pom2.gateway的异常handler3.gateway的filter4、admin的pom5、admin的登录核心 三、code-helper分享总结 前言 最近有个活蛮赶的&#xff0c;根据Excel列的需求预估的工时直接打骨折&#xff0c;不要问我为什么&#xff0c;主要…...

【论文笔记】若干矿井粉尘检测算法概述

总的来说&#xff0c;传统机器学习、传统机器学习与深度学习的结合、LSTM等算法所需要的数据集来源于矿井传感器测量的粉尘浓度&#xff0c;通过建立回归模型来预测未来矿井的粉尘浓度。传统机器学习算法性能易受数据中极端值的影响。YOLO等计算机视觉算法所需要的数据集来源于…...

spring:实例工厂方法获取bean

spring处理使用静态工厂方法获取bean实例&#xff0c;也可以通过实例工厂方法获取bean实例。 实例工厂方法步骤如下&#xff1a; 定义实例工厂类&#xff08;Java代码&#xff09;&#xff0c;定义实例工厂&#xff08;xml&#xff09;&#xff0c;定义调用实例工厂&#xff…...

【Go】3、Go语言进阶与依赖管理

前言 本系列文章参考自稀土掘金上的 【字节内部课】公开课&#xff0c;做自我学习总结整理。 Go语言并发编程 Go语言原生支持并发编程&#xff0c;它的核心机制是 Goroutine 协程、Channel 通道&#xff0c;并基于CSP&#xff08;Communicating Sequential Processes&#xff0…...

Mac软件卸载指南,简单易懂!

刚和Adobe分手&#xff0c;它却总在Library里给你写"回忆录"&#xff1f;卸载的Final Cut Pro像电子幽灵般阴魂不散&#xff1f;总是会有残留文件&#xff0c;别慌&#xff01;这份Mac软件卸载指南&#xff0c;将用最硬核的方式教你"数字分手术"&#xff0…...

CocosCreator 之 JavaScript/TypeScript和Java的相互交互

引擎版本&#xff1a; 3.8.1 语言&#xff1a; JavaScript/TypeScript、C、Java 环境&#xff1a;Window 参考&#xff1a;Java原生反射机制 您好&#xff0c;我是鹤九日&#xff01; 回顾 在上篇文章中&#xff1a;CocosCreator Android项目接入UnityAds 广告SDK。 我们简单讲…...