当前位置: 首页 > news >正文

kafka配置SASL/PLAIN 安全认证

1 zookeeper配置启动

1.1 zookeeper添加SASL支持

为zookeeper添加SASL支持,在配置文件zoo.cfg添加

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

1.2 zk_server_jaas.conf文件

新建zk_server_jaas.conf文件,为Zookeeper添加账号认证信息.这个文件你放在哪里随意,只要后面zkEnv配置正确的路径就好了。我是放在/opt/zookeeper/conf/home路径下。zk_server_jaas.conf文件的内容如下
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username=“cluster”
password=“clusterpasswd”
user_kafka=“kafkapasswd”;
};`
username和paasword是zk集群之间的认证密码。
user_kafka=“kafkapasswd"定义了一个用户"kafka”,密码是"kafkapasswd",本次测试用户是kafka broker。

1.3 引入jar包

由上一步可发现,认证方式使用的是Kafka的认证类org.apache.kafka.common.security.plain.PlainLoginModule。因此zk需要依赖几个jar包。
在home下新建zk_sasl_dependency目录,从kafka/lib目录下复制以下几个jar包到该目录下。根据kafka版本不同,几个jar包的版本可能不一样

-rw-r--r-- 1 root root 1893564 Aug 29 10:53 kafka-clients-2.0.0.jar
-rw-r--r-- 1 root root  489884 Aug 29 11:14 log4j-1.2.17.jar
-rw-r--r-- 1 root root  370137 Aug 29 10:53 lz4-java-1.4.1.jar
-rw-r--r-- 1 root root   41203 Aug 29 10:53 slf4j-api-1.7.25.jar
-rw-r--r-- 1 root root   12244 Aug 29 10:53 slf4j-log4j12-1.7.25.jar
-rw-r--r-- 1 root root 2019013 Aug 29 10:53 snappy-java-1.1.7.1.jar
[root@sm_qf-bj_hydgpt_192-168-151-168 home]# 

1.4.修改zkEnv.sh

在zkEnv.sh添加


for i in /opt/zookeeper/conf/home/zk_sasl_dependency/*.jar
doCLASSPATH="$i:$CLASSPATH"
done
SERVER_JVMFLAGS=" -Djava.security.auth.login.config=/opt/zookeeper/conf/home/zk_server_jaas.conf "

1.5.启动zk服务端

执行./zkServer.sh restart重新启动zk。如果启动异常查看日志排查问题

2 kafka配置和启动

2.1.新建kafka_server_jaas.conf,为kafka添加认证信息

KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="cluster"password="cluster"user_cluster=“clusterpasswd”user_kafka="kafkapasswd" ;
};
Client{org.apache.kafka.common.security.plain.PlainLoginModule required  username="kafka"  password="kafkapasswd";  
};

KafkaServer,第一行指定了认证方法为PLAIN,usernam和password是kafka的多个broker之间进行认证的账号密码。
user_kafka="kafkapasswd"设置了用户kafka,密码为kafkapswd,用于客户端的生产者和消费者连接认证。
网上的说法是 Client,是kafka作为用户使用zk的认证信息,这里的username和password一定要和zk_server_jaas.conf的配置对的上。

2. 2.在kafka的配置文件开启SASL认证

listeners=SASL_PLAINTEXT://(IP):9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN 
sasl.enabled.mechanisms=PLAIN
allow.everyone.if.no.acl.found=true

2.3 .在server启动脚本JVM参数

export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
改为
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/home/kafka_server_jaas.conf"

2.4 启动

./kafka-server-start.sh ../config/server.properties

2.5

kafka服务端正常启动后,应该会有类似下面这行的日志信息,说明认证功能开启成功

Registered broker 0 at path /brokers/ids/0 with addresses: EndPoint((IP),9092,ListenerName(SASL_PLAINTEXT),SASL_PLAINTEXT) (kafka.utils.ZkUtils)

3 kafka的SASL认证功能认证和使用

1.使用kafka脚本认证

我们使用kafka自带的脚本进行认证。

1.新建kafka_client_jaas.conf,为客户端添加认证信息

在/home下新建kafka_client_jaas.conf,添加以下信息

KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="kafka"password="kafkapasswd";
};

2.修改客户端配置信息

修改producer.properties和consumer.properties,添加认证机制

security.protocol=SASL_PLAINTEXT 
sasl.mechanism=PLAIN 

3.修改客户端启动脚本

修改kafka-console-producer.sh,配置认证文件kafka_client_jaas.conf,将

export KAFKA_HEAP_OPTS=“-Xmx512M”

export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/home/kafka_client_jaas.conf"

kafka-console-consumer.sh的修改类似。

4.客户端启动并认证

启动consumer

./bin/kafka-console-consumer.sh --bootstrap-server (IP):9092 --topic test --from-beginning --consumer.config config/consumer.properties

启动producer

./bin/kafka-console-producer.sh --broker-list (IP):9092 --topic test --producer.config configoducer.properties

producer端发送消息,consumer端成功接收到消息。

4.Java客户端认证

package com.zte.sdn.oscp.jms.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Test;import java.util.Collections;
import java.util.Properties;public class KafkaTest {@Testpublic void testProduct() throws Exception {System.setProperty("java.security.auth.login.config", "F:/kafka_client_jaas.conf");Properties props = new Properties();props.put("bootstrap.servers", "IP:9092");props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "PLAIN");Producer<String, String> producer = new KafkaProducer<>(props);while (true){long startTime = System.currentTimeMillis();for (int i = 0; i < 100; i++) {producer.send(new ProducerRecord<>("kafkatest", Integer.toString(i), Integer.toString(i)));}System.out.println(System.currentTimeMillis()-startTime);Thread.sleep(5000);}}@Testpublic void testConsumer() throws Exception {System.setProperty("java.security.auth.login.config", "F:/kafka_client_jaas.conf");Properties props = new Properties();props.put("bootstrap.servers", "(IP):9092");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("group.id", "kafka_test_group");props.put("session.timeout.ms", "6000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "PLAIN");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("kafkatest"));while (true) {long startTime = System.currentTimeMillis();ConsumerRecords<String, String> records = consumer.poll(1000);System.out.println(System.currentTimeMillis() - startTime);System.out.println("recieve message number is " + records.count());for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s, partition = %d %n",record.offset(),record.key(),record.value(),record.partition());}}}
}

相关文章:

kafka配置SASL/PLAIN 安全认证

1 zookeeper配置启动 1.1 zookeeper添加SASL支持 为zookeeper添加SASL支持&#xff0c;在配置文件zoo.cfg添加 authProvider.1org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthSchemesasl jaasLoginRenew36000001.2 zk_server_jaas.conf文件…...

pdf加密如何解除?这样解除加密很简单

pdf加密如何解除&#xff1f;有时&#xff0c;我们可能会收到一些加密的PDF文件&#xff0c;它们不允许我们对其进行编辑或打印。这时&#xff0c;我们需要使用PDF解密工具&#xff0c;以便能够轻松地解除PDF加密并对其进行编辑。那么接下来就给大家介绍一下pdf加密解除的方法。…...

Ubuntu18.04使用Systemback制作系统镜像并还原

系列文章目录 文章目录 系列文章目录前言一、下载Systemback工具二、制作系统镜像到U盘三、安装制作系统 前言 在Ubuntu系统中开发项目时&#xff0c;有时会希望将项目移植到另外一台计算机&#xff08;如工控机等&#xff09;上进行部署&#xff0c;通常会在新计算机中安装Ub…...

OpenCV(十五):拷贝图像

在OpenCV中&#xff0c;拷贝图像数据时有两种方式&#xff1a;深拷贝&#xff08;Deep Copy&#xff09;和浅拷贝&#xff08;Shallow Copy&#xff09;。这两种拷贝方式的主要区别在于是否创建新的图像副本。 浅拷贝&#xff08;Shallow Copy&#xff09;是指将图像对象的指针…...

原神世界中的顺序表:派蒙的趣味数据结构讲解

派蒙&#xff0c;那个总是带着疑问眼神的小家伙&#xff0c;是原神世界中的小精灵。他总是充满好奇心&#xff0c;无论是对新的冒险者&#xff0c;还是对各种奇妙的现象。而他的另一个身份&#xff0c;则是原神世界中的数据结构大师。 一天&#xff0c;派蒙遇到了旅行者小森&a…...

电脑入门:路由器 基本设置操作说明

路由器 基本设置操作说明 首先我们我设置路由器,就需要先登录路由器, 那么怎样登路由器啊? 登录路由器的方法是 在ie的地址栏输入:http://192.168.1.1 输入完成以后直接回车 那么如果你输入正确 这个时候就应该听到有用户名的提示 呵呵 这是怎么回事啊? 不要召集 首…...

搜索与图论-拓扑序列

为什么记录呢 因为不记录全忘了 虽然记了也不一定会看 有向无环图一定有拓扑序列邮箱无环图 - 拓扑图 入度为0的点作为起点入度为0的点入队列枚举出边 t->j删掉当前边&#xff0c;t->j . j的入度减1判断j的入度是否为0&#xff0c;来判断是否加入队列 有环&#xff1a; …...

「MySQL-05」MySQL Workbench的下载和使用

目录 一、MySQL workbench的下载和安装 1. MySQL workbench介绍 2. 到MySQL官网下载mysql workbench 3. 安装workbench 二、创建能远程登录的用户并授权 1. 创建用户oj_client 2. 创建oj数据库 3. 给用户授权 4. 在Linux上登录用户oj_client检查其是否能操作oj数据库 三、使用…...

编译期jni类型转换成字符串

背景: 例如android jni 方法的签名, 这个需要每个用户都要知道具体类型,转化成签名, 要想写好签名, 必须很熟悉 类型对应的签名, 尤其java类对象要加个L, 本文将介绍怎么在编译期过程把类型转化成字符, 多个类型在尽性拼接. 定义基础数据结构 template<char ... ch> str…...

优秀的ui设计作品(合集)

UI设计师需要了解的九个Tips 1.图片类APP排版突破 规则是死的&#xff0c;人是活的。很多时候&#xff0c;如果需求是比较宽要尝试突破原则&#xff0c;用一些另类的排版方式&#xff0c;其实也是做好设计的本质。在图片类app中&#xff0c;错落一些的排版会使你的作品更有魅力…...

【c/c++】c和cpp混合编译

c和cpp混合编译 #ifdef __cplusplus extern "C" { #endifextern int test(int, int);#ifdef __cplusplus } #endif在这段代码中&#xff0c;#ifdef __cplusplus 和 #endif 之间的代码是为了在 C 中使用 C 语言的函数声明和定义时&#xff0c;确保编译器正确地处理 C…...

springboot定制banner

这里有几个定制banner的网站 Text to ASCII Art Generator (TAAG) ASCII Generator IMG2TXT: ASCII Art Made Easy!...

Qt 入门实战教程(目录)

为何我要写Qt入门教程 前置课程 《C自学精简实践教程》 教程特点 1 面向企业开发&#xff0c;你在这里学到的任何一步操作&#xff0c;都会直接在企业里用到。 2 注重设计思路训练&#xff0c;抽象分析问题的能力。 Qt 安装 1.1 Windows Qt 5.12.10下载与安装 1.2 我们…...

Ceph入门到精通-Lunix性能分析工具汇总

出于对Linux操作系统的兴趣&#xff0c;以及对底层知识的强烈欲望&#xff0c;因此整理了这篇文章。本文也可以作为检验基础知识的指标&#xff0c;另外文章涵盖了一个系统的方方面面。如果没有完善的计算机系统知识&#xff0c;网络知识和操作系统知识&#xff0c;文档中的工具…...

服务器端使用django websocket,客户端使用uniapp 请问服务端和客户端群组互发消息的代码怎么写的参考笔记

2023/8/29 19:21:11 服务器端使用django websocket,客户端使用uniapp 请问服务端和客户端群组互发消息的代码怎么写 2023/8/29 19:22:25 在服务器端使用Django WebSocket和客户端使用Uniapp的情况下&#xff0c;以下是代码示例来实现服务器端和客户端之间的群组互发消息。 …...

【考研数学】线性代数第四章 —— 线性方程组(2,线性方程组的通解 | 理论延伸)

文章目录 引言四、线性方程组的通解4.1 齐次线性方程组4.2 非齐次线性方程组 五、方程组解的理论延伸 引言 承接前文&#xff0c;继续学习线性方程组的内容&#xff0c;从方程组的通解开始。 四、线性方程组的通解 4.1 齐次线性方程组 &#xff08;1&#xff09;基础解系 —…...

go读取文件的几种方法

一. 整个文件读入内存 直接将数据直接读取入内存&#xff0c;是效率最高的一种方式&#xff0c;但此种方式&#xff0c;仅适用于小文件&#xff0c;对于大文件&#xff0c;则不适合&#xff0c;因为比较浪费内存 1.直接指定文化名读取 在 Go 1.16 开始&#xff0c;ioutil.Rea…...

ChatGPT癌症治疗“困难重重”,真假混讲难辨真假,准确有待提高

近年来&#xff0c;人工智能在医疗领域的应用逐渐增多&#xff0c;其中自然语言处理模型如ChatGPT在提供医疗建议和信息方面引起了广泛关注。然而&#xff0c;最新的研究表明&#xff0c;尽管ChatGPT在许多领域取得了成功&#xff0c;但它在癌症治疗方案上的准确性仍有待提高。…...

docker打包vue vite前端项目

打包vue vite 前端项目 1.打包时将测试删除 2.修改配置 3.打包项目 npm run build 显示成功&#xff08;黄的也不知道是啥&#xff09; 打包好的前端文件放入 4.配置 default.conf upstream wms-app {server 你自己的ip加端口 ;server 192.168.xx.xx:8080 ; } server { …...

zookeeper 查询注册的 dubbo 服务

1. 连接zookeeper 服务端 使用bin 目录下zk客户端连接服务器&#xff0c; ./zkCli.sh -server 127.0.0.1:2181 2. 查询Dubbo 服务 # 查询所有服务 ls /dubbo # 查询指定服务调用 ls /dubbo/服务名(接口地址)/consumers # 查询指定服务调用 ls /dubbo/服务名(接口地址)/pr…...

挑战杯推荐项目

“人工智能”创意赛 - 智能艺术创作助手&#xff1a;借助大模型技术&#xff0c;开发能根据用户输入的主题、风格等要求&#xff0c;生成绘画、音乐、文学作品等多种形式艺术创作灵感或初稿的应用&#xff0c;帮助艺术家和创意爱好者激发创意、提高创作效率。 ​ - 个性化梦境…...

基于服务器使用 apt 安装、配置 Nginx

&#x1f9fe; 一、查看可安装的 Nginx 版本 首先&#xff0c;你可以运行以下命令查看可用版本&#xff1a; apt-cache madison nginx-core输出示例&#xff1a; nginx-core | 1.18.0-6ubuntu14.6 | http://archive.ubuntu.com/ubuntu focal-updates/main amd64 Packages ng…...

理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端

&#x1f31f; 什么是 MCP&#xff1f; 模型控制协议 (MCP) 是一种创新的协议&#xff0c;旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议&#xff0c;它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...

从深圳崛起的“机器之眼”:赴港乐动机器人的万亿赛道赶考路

进入2025年以来&#xff0c;尽管围绕人形机器人、具身智能等机器人赛道的质疑声不断&#xff0c;但全球市场热度依然高涨&#xff0c;入局者持续增加。 以国内市场为例&#xff0c;天眼查专业版数据显示&#xff0c;截至5月底&#xff0c;我国现存在业、存续状态的机器人相关企…...

【Redis技术进阶之路】「原理分析系列开篇」分析客户端和服务端网络诵信交互实现(服务端执行命令请求的过程 - 初始化服务器)

服务端执行命令请求的过程 【专栏简介】【技术大纲】【专栏目标】【目标人群】1. Redis爱好者与社区成员2. 后端开发和系统架构师3. 计算机专业的本科生及研究生 初始化服务器1. 初始化服务器状态结构初始化RedisServer变量 2. 加载相关系统配置和用户配置参数定制化配置参数案…...

Leetcode 3577. Count the Number of Computer Unlocking Permutations

Leetcode 3577. Count the Number of Computer Unlocking Permutations 1. 解题思路2. 代码实现 题目链接&#xff1a;3577. Count the Number of Computer Unlocking Permutations 1. 解题思路 这一题其实就是一个脑筋急转弯&#xff0c;要想要能够将所有的电脑解锁&#x…...

vue3 字体颜色设置的多种方式

在Vue 3中设置字体颜色可以通过多种方式实现&#xff0c;这取决于你是想在组件内部直接设置&#xff0c;还是在CSS/SCSS/LESS等样式文件中定义。以下是几种常见的方法&#xff1a; 1. 内联样式 你可以直接在模板中使用style绑定来设置字体颜色。 <template><div :s…...

Nginx server_name 配置说明

Nginx 是一个高性能的反向代理和负载均衡服务器&#xff0c;其核心配置之一是 server 块中的 server_name 指令。server_name 决定了 Nginx 如何根据客户端请求的 Host 头匹配对应的虚拟主机&#xff08;Virtual Host&#xff09;。 1. 简介 Nginx 使用 server_name 指令来确定…...

SpringCloudGateway 自定义局部过滤器

场景&#xff1a; 将所有请求转化为同一路径请求&#xff08;方便穿网配置&#xff09;在请求头内标识原来路径&#xff0c;然后在将请求分发给不同服务 AllToOneGatewayFilterFactory import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; impor…...

云原生安全实战:API网关Kong的鉴权与限流详解

&#x1f525;「炎码工坊」技术弹药已装填&#xff01; 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 一、基础概念 1. API网关&#xff08;API Gateway&#xff09; API网关是微服务架构中的核心组件&#xff0c;负责统一管理所有API的流量入口。它像一座…...