kafka配置SASL/PLAIN 安全认证
1 zookeeper配置启动
1.1 zookeeper添加SASL支持
为zookeeper添加SASL支持,在配置文件zoo.cfg添加
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
1.2 zk_server_jaas.conf文件
新建zk_server_jaas.conf文件,为Zookeeper添加账号认证信息.这个文件你放在哪里随意,只要后面zkEnv配置正确的路径就好了。我是放在/opt/zookeeper/conf/home路径下。zk_server_jaas.conf文件的内容如下
Server {
org.apache.kafka.common.security.plain.PlainLoginModule required
username=“cluster”
password=“clusterpasswd”
user_kafka=“kafkapasswd”;
};`
username和paasword是zk集群之间的认证密码。
user_kafka=“kafkapasswd"定义了一个用户"kafka”,密码是"kafkapasswd",本次测试用户是kafka broker。
1.3 引入jar包
由上一步可发现,认证方式使用的是Kafka的认证类org.apache.kafka.common.security.plain.PlainLoginModule。因此zk需要依赖几个jar包。
在home下新建zk_sasl_dependency目录,从kafka/lib目录下复制以下几个jar包到该目录下。根据kafka版本不同,几个jar包的版本可能不一样
-rw-r--r-- 1 root root 1893564 Aug 29 10:53 kafka-clients-2.0.0.jar
-rw-r--r-- 1 root root 489884 Aug 29 11:14 log4j-1.2.17.jar
-rw-r--r-- 1 root root 370137 Aug 29 10:53 lz4-java-1.4.1.jar
-rw-r--r-- 1 root root 41203 Aug 29 10:53 slf4j-api-1.7.25.jar
-rw-r--r-- 1 root root 12244 Aug 29 10:53 slf4j-log4j12-1.7.25.jar
-rw-r--r-- 1 root root 2019013 Aug 29 10:53 snappy-java-1.1.7.1.jar
[root@sm_qf-bj_hydgpt_192-168-151-168 home]#
1.4.修改zkEnv.sh
在zkEnv.sh添加
for i in /opt/zookeeper/conf/home/zk_sasl_dependency/*.jar
doCLASSPATH="$i:$CLASSPATH"
done
SERVER_JVMFLAGS=" -Djava.security.auth.login.config=/opt/zookeeper/conf/home/zk_server_jaas.conf "
1.5.启动zk服务端
执行./zkServer.sh restart重新启动zk。如果启动异常查看日志排查问题
2 kafka配置和启动
2.1.新建kafka_server_jaas.conf,为kafka添加认证信息
KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="cluster"password="cluster"user_cluster=“clusterpasswd”user_kafka="kafkapasswd" ;
};
Client{org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafkapasswd";
};
KafkaServer,第一行指定了认证方法为PLAIN,usernam和password是kafka的多个broker之间进行认证的账号密码。
user_kafka="kafkapasswd"设置了用户kafka,密码为kafkapswd,用于客户端的生产者和消费者连接认证。
网上的说法是 Client,是kafka作为用户使用zk的认证信息,这里的username和password一定要和zk_server_jaas.conf的配置对的上。
2. 2.在kafka的配置文件开启SASL认证
listeners=SASL_PLAINTEXT://(IP):9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN
allow.everyone.if.no.acl.found=true
2.3 .在server启动脚本JVM参数
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
改为
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/home/kafka_server_jaas.conf"
2.4 启动
./kafka-server-start.sh ../config/server.properties
2.5
kafka服务端正常启动后,应该会有类似下面这行的日志信息,说明认证功能开启成功
Registered broker 0 at path /brokers/ids/0 with addresses: EndPoint((IP),9092,ListenerName(SASL_PLAINTEXT),SASL_PLAINTEXT) (kafka.utils.ZkUtils)
3 kafka的SASL认证功能认证和使用
1.使用kafka脚本认证
我们使用kafka自带的脚本进行认证。
1.新建kafka_client_jaas.conf,为客户端添加认证信息
在/home下新建kafka_client_jaas.conf,添加以下信息
KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="kafka"password="kafkapasswd";
};
2.修改客户端配置信息
修改producer.properties和consumer.properties,添加认证机制
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
3.修改客户端启动脚本
修改kafka-console-producer.sh,配置认证文件kafka_client_jaas.conf,将
export KAFKA_HEAP_OPTS=“-Xmx512M”
export KAFKA_HEAP_OPTS="-Xmx512M -Djava.security.auth.login.config=/home/kafka_client_jaas.conf"
kafka-console-consumer.sh的修改类似。
4.客户端启动并认证
启动consumer
./bin/kafka-console-consumer.sh --bootstrap-server (IP):9092 --topic test --from-beginning --consumer.config config/consumer.properties
启动producer
./bin/kafka-console-producer.sh --broker-list (IP):9092 --topic test --producer.config configoducer.properties
producer端发送消息,consumer端成功接收到消息。
4.Java客户端认证
package com.zte.sdn.oscp.jms.kafka;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Test;import java.util.Collections;
import java.util.Properties;public class KafkaTest {@Testpublic void testProduct() throws Exception {System.setProperty("java.security.auth.login.config", "F:/kafka_client_jaas.conf");Properties props = new Properties();props.put("bootstrap.servers", "IP:9092");props.put("acks", "all");props.put("retries", 0);props.put("batch.size", 16384);props.put("linger.ms", 1);props.put("buffer.memory", 33554432);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "PLAIN");Producer<String, String> producer = new KafkaProducer<>(props);while (true){long startTime = System.currentTimeMillis();for (int i = 0; i < 100; i++) {producer.send(new ProducerRecord<>("kafkatest", Integer.toString(i), Integer.toString(i)));}System.out.println(System.currentTimeMillis()-startTime);Thread.sleep(5000);}}@Testpublic void testConsumer() throws Exception {System.setProperty("java.security.auth.login.config", "F:/kafka_client_jaas.conf");Properties props = new Properties();props.put("bootstrap.servers", "(IP):9092");props.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000");props.put("group.id", "kafka_test_group");props.put("session.timeout.ms", "6000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "PLAIN");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("kafkatest"));while (true) {long startTime = System.currentTimeMillis();ConsumerRecords<String, String> records = consumer.poll(1000);System.out.println(System.currentTimeMillis() - startTime);System.out.println("recieve message number is " + records.count());for (ConsumerRecord<String, String> record : records) {System.out.printf("offset = %d, key = %s, value = %s, partition = %d %n",record.offset(),record.key(),record.value(),record.partition());}}}
}相关文章:
kafka配置SASL/PLAIN 安全认证
1 zookeeper配置启动 1.1 zookeeper添加SASL支持 为zookeeper添加SASL支持,在配置文件zoo.cfg添加 authProvider.1org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthSchemesasl jaasLoginRenew36000001.2 zk_server_jaas.conf文件…...
pdf加密如何解除?这样解除加密很简单
pdf加密如何解除?有时,我们可能会收到一些加密的PDF文件,它们不允许我们对其进行编辑或打印。这时,我们需要使用PDF解密工具,以便能够轻松地解除PDF加密并对其进行编辑。那么接下来就给大家介绍一下pdf加密解除的方法。…...
Ubuntu18.04使用Systemback制作系统镜像并还原
系列文章目录 文章目录 系列文章目录前言一、下载Systemback工具二、制作系统镜像到U盘三、安装制作系统 前言 在Ubuntu系统中开发项目时,有时会希望将项目移植到另外一台计算机(如工控机等)上进行部署,通常会在新计算机中安装Ub…...
OpenCV(十五):拷贝图像
在OpenCV中,拷贝图像数据时有两种方式:深拷贝(Deep Copy)和浅拷贝(Shallow Copy)。这两种拷贝方式的主要区别在于是否创建新的图像副本。 浅拷贝(Shallow Copy)是指将图像对象的指针…...
原神世界中的顺序表:派蒙的趣味数据结构讲解
派蒙,那个总是带着疑问眼神的小家伙,是原神世界中的小精灵。他总是充满好奇心,无论是对新的冒险者,还是对各种奇妙的现象。而他的另一个身份,则是原神世界中的数据结构大师。 一天,派蒙遇到了旅行者小森&a…...
电脑入门:路由器 基本设置操作说明
路由器 基本设置操作说明 首先我们我设置路由器,就需要先登录路由器, 那么怎样登路由器啊? 登录路由器的方法是 在ie的地址栏输入:http://192.168.1.1 输入完成以后直接回车 那么如果你输入正确 这个时候就应该听到有用户名的提示 呵呵 这是怎么回事啊? 不要召集 首…...
搜索与图论-拓扑序列
为什么记录呢 因为不记录全忘了 虽然记了也不一定会看 有向无环图一定有拓扑序列邮箱无环图 - 拓扑图 入度为0的点作为起点入度为0的点入队列枚举出边 t->j删掉当前边,t->j . j的入度减1判断j的入度是否为0,来判断是否加入队列 有环: …...
「MySQL-05」MySQL Workbench的下载和使用
目录 一、MySQL workbench的下载和安装 1. MySQL workbench介绍 2. 到MySQL官网下载mysql workbench 3. 安装workbench 二、创建能远程登录的用户并授权 1. 创建用户oj_client 2. 创建oj数据库 3. 给用户授权 4. 在Linux上登录用户oj_client检查其是否能操作oj数据库 三、使用…...
编译期jni类型转换成字符串
背景: 例如android jni 方法的签名, 这个需要每个用户都要知道具体类型,转化成签名, 要想写好签名, 必须很熟悉 类型对应的签名, 尤其java类对象要加个L, 本文将介绍怎么在编译期过程把类型转化成字符, 多个类型在尽性拼接. 定义基础数据结构 template<char ... ch> str…...
优秀的ui设计作品(合集)
UI设计师需要了解的九个Tips 1.图片类APP排版突破 规则是死的,人是活的。很多时候,如果需求是比较宽要尝试突破原则,用一些另类的排版方式,其实也是做好设计的本质。在图片类app中,错落一些的排版会使你的作品更有魅力…...
【c/c++】c和cpp混合编译
c和cpp混合编译 #ifdef __cplusplus extern "C" { #endifextern int test(int, int);#ifdef __cplusplus } #endif在这段代码中,#ifdef __cplusplus 和 #endif 之间的代码是为了在 C 中使用 C 语言的函数声明和定义时,确保编译器正确地处理 C…...
springboot定制banner
这里有几个定制banner的网站 Text to ASCII Art Generator (TAAG) ASCII Generator IMG2TXT: ASCII Art Made Easy!...
Qt 入门实战教程(目录)
为何我要写Qt入门教程 前置课程 《C自学精简实践教程》 教程特点 1 面向企业开发,你在这里学到的任何一步操作,都会直接在企业里用到。 2 注重设计思路训练,抽象分析问题的能力。 Qt 安装 1.1 Windows Qt 5.12.10下载与安装 1.2 我们…...
Ceph入门到精通-Lunix性能分析工具汇总
出于对Linux操作系统的兴趣,以及对底层知识的强烈欲望,因此整理了这篇文章。本文也可以作为检验基础知识的指标,另外文章涵盖了一个系统的方方面面。如果没有完善的计算机系统知识,网络知识和操作系统知识,文档中的工具…...
服务器端使用django websocket,客户端使用uniapp 请问服务端和客户端群组互发消息的代码怎么写的参考笔记
2023/8/29 19:21:11 服务器端使用django websocket,客户端使用uniapp 请问服务端和客户端群组互发消息的代码怎么写 2023/8/29 19:22:25 在服务器端使用Django WebSocket和客户端使用Uniapp的情况下,以下是代码示例来实现服务器端和客户端之间的群组互发消息。 …...
【考研数学】线性代数第四章 —— 线性方程组(2,线性方程组的通解 | 理论延伸)
文章目录 引言四、线性方程组的通解4.1 齐次线性方程组4.2 非齐次线性方程组 五、方程组解的理论延伸 引言 承接前文,继续学习线性方程组的内容,从方程组的通解开始。 四、线性方程组的通解 4.1 齐次线性方程组 (1)基础解系 —…...
go读取文件的几种方法
一. 整个文件读入内存 直接将数据直接读取入内存,是效率最高的一种方式,但此种方式,仅适用于小文件,对于大文件,则不适合,因为比较浪费内存 1.直接指定文化名读取 在 Go 1.16 开始,ioutil.Rea…...
ChatGPT癌症治疗“困难重重”,真假混讲难辨真假,准确有待提高
近年来,人工智能在医疗领域的应用逐渐增多,其中自然语言处理模型如ChatGPT在提供医疗建议和信息方面引起了广泛关注。然而,最新的研究表明,尽管ChatGPT在许多领域取得了成功,但它在癌症治疗方案上的准确性仍有待提高。…...
docker打包vue vite前端项目
打包vue vite 前端项目 1.打包时将测试删除 2.修改配置 3.打包项目 npm run build 显示成功(黄的也不知道是啥) 打包好的前端文件放入 4.配置 default.conf upstream wms-app {server 你自己的ip加端口 ;server 192.168.xx.xx:8080 ; } server { …...
zookeeper 查询注册的 dubbo 服务
1. 连接zookeeper 服务端 使用bin 目录下zk客户端连接服务器, ./zkCli.sh -server 127.0.0.1:2181 2. 查询Dubbo 服务 # 查询所有服务 ls /dubbo # 查询指定服务调用 ls /dubbo/服务名(接口地址)/consumers # 查询指定服务调用 ls /dubbo/服务名(接口地址)/pr…...
多云管理“拦路虎”:深入解析网络互联、身份同步与成本可视化的技术复杂度
一、引言:多云环境的技术复杂性本质 企业采用多云策略已从技术选型升维至生存刚需。当业务系统分散部署在多个云平台时,基础设施的技术债呈现指数级积累。网络连接、身份认证、成本管理这三大核心挑战相互嵌套:跨云网络构建数据…...
国防科技大学计算机基础课程笔记02信息编码
1.机内码和国标码 国标码就是我们非常熟悉的这个GB2312,但是因为都是16进制,因此这个了16进制的数据既可以翻译成为这个机器码,也可以翻译成为这个国标码,所以这个时候很容易会出现这个歧义的情况; 因此,我们的这个国…...
web vue 项目 Docker化部署
Web 项目 Docker 化部署详细教程 目录 Web 项目 Docker 化部署概述Dockerfile 详解 构建阶段生产阶段 构建和运行 Docker 镜像 1. Web 项目 Docker 化部署概述 Docker 化部署的主要步骤分为以下几个阶段: 构建阶段(Build Stage):…...
XCTF-web-easyupload
试了试php,php7,pht,phtml等,都没有用 尝试.user.ini 抓包修改将.user.ini修改为jpg图片 在上传一个123.jpg 用蚁剑连接,得到flag...
Linux 文件类型,目录与路径,文件与目录管理
文件类型 后面的字符表示文件类型标志 普通文件:-(纯文本文件,二进制文件,数据格式文件) 如文本文件、图片、程序文件等。 目录文件:d(directory) 用来存放其他文件或子目录。 设备…...
Docker 运行 Kafka 带 SASL 认证教程
Docker 运行 Kafka 带 SASL 认证教程 Docker 运行 Kafka 带 SASL 认证教程一、说明二、环境准备三、编写 Docker Compose 和 jaas文件docker-compose.yml代码说明:server_jaas.conf 四、启动服务五、验证服务六、连接kafka服务七、总结 Docker 运行 Kafka 带 SASL 认…...
《通信之道——从微积分到 5G》读书总结
第1章 绪 论 1.1 这是一本什么样的书 通信技术,说到底就是数学。 那些最基础、最本质的部分。 1.2 什么是通信 通信 发送方 接收方 承载信息的信号 解调出其中承载的信息 信息在发送方那里被加工成信号(调制) 把信息从信号中抽取出来&am…...
spring:实例工厂方法获取bean
spring处理使用静态工厂方法获取bean实例,也可以通过实例工厂方法获取bean实例。 实例工厂方法步骤如下: 定义实例工厂类(Java代码),定义实例工厂(xml),定义调用实例工厂ÿ…...
ServerTrust 并非唯一
NSURLAuthenticationMethodServerTrust 只是 authenticationMethod 的冰山一角 要理解 NSURLAuthenticationMethodServerTrust, 首先要明白它只是 authenticationMethod 的选项之一, 并非唯一 1 先厘清概念 点说明authenticationMethodURLAuthenticationChallenge.protectionS…...
令牌桶 滑动窗口->限流 分布式信号量->限并发的原理 lua脚本分析介绍
文章目录 前言限流限制并发的实际理解限流令牌桶代码实现结果分析令牌桶lua的模拟实现原理总结: 滑动窗口代码实现结果分析lua脚本原理解析 限并发分布式信号量代码实现结果分析lua脚本实现原理 双注解去实现限流 并发结果分析: 实际业务去理解体会统一注…...
