当前位置: 首页 > article >正文

单机Kafka配置ssl并在springboot使用

目录

  • SSL证书
    • 生成根证书
    • 生成服务端和客户端证书
      • 生成keystore.jks和truststore.jks
      • 辅助脚本
      • 单独生成truststore.jks
  • 环境配置
    • hosts文件
    • kafka server.properties配置ssl
  • 启动kafka
  • kafka基础操作
  • springboot集成
    • 准备工作
    • 需要配置的文件
    • 开始消费

SSL证书

证书主要包含两大类,一个是根证书,用于签发和认证证书。其他证书可以用同一个根证书签发,也可以用不同的根证书签发各自的证书,使用同一个的话比较方便管理,这样所有节点的trust可以公用,即只需要生成一次,其他节点复制就可以。
整个证书生成过程大概如下图:
在这里插入图片描述
最终用于认证的是keystore.jks和truststore.jks,两个证书的作用分别是:
keystore.jks:证明自己的身份,自己的keystore.jks是由别人truststore.jks包含的ca-cert.pem签发就可以证明
truststore.jks:认证别人是否可信,看到别人的keystore.jks里有自己truststore.jks包含的ca-cert.pem就认为可信
这里要注意的是,如果要信任别人,就要在truststore中导入别人的根证书,这里是因为用的同一个根证书签发,所以导入的根证书一样,否则应该交叉导入,也就是客户端导入服务端的,服务端导入客户端的。

备注:后面的脚本直接复制使用有可能会出现下面的错误,这是由于不同系统的换行符不一致导致的,需要转换成对应系统兼容的

$'\r': command not found
: invalid optionet: -
set: usage: set [-abefhkmnptuvxBCHP] [-o option-name] [--] [arg ...]

以在Linux中使用为例,可以使用Notepad++按照以下操作路径修改一下保存之后覆盖掉就可以了。

在这里插入图片描述

生成根证书

生成根证书包含流程图的第一步,这时会生成根证书和他的私钥,命令脚本如下:

#!/bin/bashset -e# === 配置部分 需要根据自己的实际情况进行调整===
#根证书
CA_CERT="ca-cert.pem"
#根证书私钥
CA_KEY="ca-key.pem"
#有效期
VALIDITY=365
#subj
SUBJ="/CN=KafkaCA"
# 检查目录
if [ ! -d "/usr/ca/ssl" ]; thenmkdir -p /usr/ca/sslchmod 700 /usr/ca/ssl
ficd /usr/ca/ssl
# 检查文件是否已存在
if [ -f "$CA_CERT" ] || [ -f "$CA_KEY" ]; thenecho "错误: CA证书或私钥已存在,请先删除或备份现有文件"exit 1
fi#正式生成证书,以下内容可以不用调整
echo "=== 步骤 1: 生成自签名 CA ==="
openssl req -new -x509 \-keyout $CA_KEY \-out $CA_CERT \-days $VALIDITY \-nodes \-subj $SUBJ# 设置文件权限
chmod 600 "$CA_KEY"

有效命令其实就是最后一句,其他的是因为放在脚本中所以进行一些通用配置,方便复用

生成服务端和客户端证书

这个阶段包含流程图的2-7,在都用同一个CA的情况下,步骤7只需要执行一次,然后复制到所有需要用到的计算机中就可以。

生成keystore.jks和truststore.jks

#!/bin/bashset -e# === 配置部分 需要根据自己的实际情况进行调整===
ALIAS="kafka"
KEYSTORE="keystore.jks"
TRUSTSTORE="truststore.jks"
CSR="sign.csr"
SIGN="signed.crt"
STOREPASS="123456"
KEYPASS="654321"
DNAME="CN=localhost, OU=IT, O=Kafka, L=City, S=State, C=CN"
VALIDITY=365
#上一步生成的根证书
CA_CERT="/usr/ca/ssl/ca-cert.pem"
CA_KEY="/usr/ca/ssl/ca-key.pem"
# 检查目录
if [ ! -d "/usr/ca/ssl" ]; thenecho "错误: 目录 /usr/ca/ssl 不存在"exit 1
ficd /usr/ca/ssl#正式生成各个证书,以下内容可以不用调整
echo "=== 步骤 1: 生成Keystore证书和私钥 ==="
keytool -genkeypair \-alias $ALIAS \-keyalg RSA \-keysize 2048 \-validity $VALIDITY \-keystore $KEYSTORE \-storepass $STOREPASS \-keypass $KEYPASS \-dname "$DNAME"echo "=== 步骤 2: 生成证书签名请求 (CSR) ==="
keytool -keystore $KEYSTORE \-alias $ALIAS \-certreq \-file $CSR \-storepass $STOREPASSecho "=== 步骤 3: 使用 CA 签名证书 ==="
openssl x509 -req \-CA $CA_CERT \-CAkey $CA_KEY \-in $CSR \-out $SIGN \-days $VALIDITY \-CAcreateserialecho "=== 步骤 4: 将 CA 根证书导入Keystore ==="
keytool -keystore $KEYSTORE \-alias CARoot \-import -file $CA_CERT \-storepass $STOREPASS -nopromptecho "=== 步骤 5: 将签名证书导入 Keystore ==="
keytool -keystore $KEYSTORE \-alias $ALIAS \-import -file $SIGN \-storepass $STOREPASS -noprompt#使用同一个CA证书,在多个计算机使用时,下面这步可以只执行一次,每次新生成也不影响
echo "=== 步骤 6: 创建Truststore(导入 CA 根证书) ==="
keytool -keystore $TRUSTSTORE \-alias CARoot \-import -file $CA_CERT \-storepass $STOREPASS -noprompt

辅助脚本

如果正在生成服务端证书,需要把相关证书配置到server.properties可以在上面脚本中增加一下内容:

#顺便生成后续Kafka要配置的内容,直接复制到server.properties文件
cat <<EOF############ SSL 配置 - server.properties 中添加 ############listeners=SSL://:9092
#下面的localhost需要改成ip,否则只有自己能连上
advertised.listeners=SSL://localhost:9092
security.inter.broker.protocol=SSL
ssl.endpoint.identification.algorithm=
ssl.keystore.location=$(pwd)/$KEYSTORE
ssl.keystore.password=$STOREPASS
ssl.key.password=$KEYPASS
ssl.truststore.location=$(pwd)/$TRUSTSTORE
ssl.truststore.password=$STOREPASS
#这里配置成双向认证
ssl.client.auth=required
# 不验证客户端证书
#ssl.client.auth=none  #############################################################EOF

如果是为客户端生成证书,可以增加一下内容:

#如果是客户端就增加使用以下脚本生成的文件去执行Kafka相关命令
echo "=== 创建 Kafka 客户端配置文件 client.properties ==="
cat <<EOF > client.properties
security.protocol=SSL
ssl.truststore.location=$(pwd)/$TRUSTSTORE
ssl.truststore.password=$STOREPASS
ssl.endpoint.identification.algorithm=
group.id=test-group
#如果单向认证就不用添加下面三个配置
ssl.keystore.location=$(pwd)/$KEYSTORE
ssl.keystore.password=$STOREPASS
ssl.key.password=$KEYPASS
EOF

单独生成truststore.jks

#!/bin/bashset -e
# === 配置部分 需要根据自己的实际情况进行调整===
TRUSTSTORE="truststore.jks"
STOREPASS="123456"
#信任的根证书
CA_CERT="/usr/ca/ssl/ca-cert.pem"
echo "=== 步骤 6: 创建Truststore(导入 CA 根证书) ==="
keytool -keystore $TRUSTSTORE \-alias CARoot \-import -file $CA_CERT \-storepass $STOREPASS -noprompt

环境配置

hosts文件

文件位置:
Windows hosts:C:\Windows\System32\drivers\etc\hosts
Linux hosts:/etc/hosts
添加内容:公网IP kafka
如果是本机使用也可以直接用内网IP

kafka server.properties配置ssl

把生成jks那步输出的内容增加到server.properties中就可以,如果不是第一次配置,就只增加自己需要配置的内容即可。或者没有增加辅助脚本的话,直接把下面内容中keystore和truststore的位置手动替换一下就行:

#下面两项原来如果已经配置过就不要重复
listeners=SSL://:9092
#下面的localhost需要改成ip,否则只有自己能连上
advertised.listeners=SSL://localhost:9092security.inter.broker.protocol=SSL
ssl.endpoint.identification.algorithm=
ssl.keystore.location=这里替换成keystore的路径
ssl.keystore.password=keystore的密码
ssl.key.password=key的密码
ssl.truststore.location=这里替换成truststore的路径
ssl.truststore.password=truststore的密码#这里配置成双向认证
ssl.client.auth=required
# 不验证客户端证书
#ssl.client.auth=none  

启动kafka

cd到kafka安装路径下可以直接执行下面命令,或者使用绝对路径
/bin/zookeeper-server-start.sh -daemon /config/zookeeper.properties
/bin/kafka-server-start.sh -daemon /config/server.properties

kafka基础操作

client.properties的路径要换成自己的
创建topic

 bin/kafka-topics.sh   --create --bootstrap-server kafka:9092 --replication-factor 1 --partitions 1 --topic test-topic  --command-config /usr/local/kafka/ssl/client.properties

查看topic

 bin/kafka-topics.sh --list --bootstrap-server kafka:9092  --command-config /usr/ca/ssl-server/ssl-client/client-ssl.properties

生成消息

 bin/kafka-console-producer.sh  --bootstrap-server  kafka:9092 --topic test-topic --producer.config /usr/local/kafka/ssl/client.properties

springboot集成

准备工作

1、生成客户端keystore.jks和truststore.jks
这时候spring程序作为客户端,所以需要为他生成一个keystore.jks和truststore.jks,然后放到项目或别的位置,配置到项目中用来认证。如果只是暂时测试一下是否能连通,也可以讨巧,直接用服务端的同一套keystore.jks和truststore.jks,但是这样的操作不能用到正式中。

2、修改项目所在计算机的hosts文件

需要配置的文件

pom:引入kafka依赖,有说需要版本对应的,我直接没有指定版本也是可以的

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

yml:增加Kafka配置项,当然也可以放到代码里

spring:kafka:#kafka代理地址bootstrap-servers: kafka:9092ssl:protocol: SSL###服务端证书配置的时候设置的密码#broke对client的认证,ssl.client.auth=required时需要key的配置key-store-location: classpath:/certs/keystore.jkskey-store-password: 123456key-password: 654321#client对broke的认证trust-store-password: 123456trust-store-location: classpath:/certs/truststore.jkskey-store-type: JKS#不验证主机名  properties:ssl:endpoint:identification:algorithm: ''security:protocol: SSL#认证的配置就到这里了,下面的配置可以根据自己的习惯配置#消息发送失败重试次数producer:retries: 0# 指定消息key和消息体的编解码方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: consumer-dev-groupauto-offset-reset: earliestenable-auto-commit: falsemax-poll-records: 30# 指定消息key和消息体的编解码方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:ack-mode: manualtype: batchconcurrency: 1#kafka监听的topic和group
report:kafka:#接收kafka消息的topic和groupproducerTopic: test-topicreportGroup: test-group

开始消费

如果需要生成可以找别的教程,我直接通过命令进行生产的,只是写了一个消费(后面有时间可以补上)


import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;import java.util.List;@Slf4j
@Component
public class Consumer {@KafkaListener(topics = "${report.kafka.producerTopic}", groupId = "${report.kafka.reportGroup}")public void reportConsumer(List<ConsumerRecord<String, String>> consumerRecords, Acknowledgment ack) {log.info("---------- 从Kafka上接收消息 -----");for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {log.info("offset是" + consumerRecord.offset() + "," + consumerRecord.partition());String value = consumerRecord.value();System.out.println("接到的内容:"+value);//具体的业务处理逻辑可以写在后面}// 手动批量ackack.acknowledge();log.info("kafka提交成功");}}

相关文章:

单机Kafka配置ssl并在springboot使用

目录 SSL证书生成根证书生成服务端和客户端证书生成keystore.jks和truststore.jks辅助脚本单独生成truststore.jks 环境配置hosts文件kafka server.properties配置ssl 启动kafkakafka基础操作springboot集成准备工作需要配置的文件开始消费 SSL证书 证书主要包含两大类&#x…...

《棒球特长生》棒球升学途径·棒球1号位

美国大学棒球体系 | U.S. College Baseball System 美国大学棒球主要通过 NCAA&#xff08;全国大学体育协会&#xff09;和 NAIA&#xff08;全美校际体育协会&#xff09;组织&#xff0c;分为三个级别&#xff1a; NCAA Division I&#xff1a;竞技水平最高&#xff0c;提…...

JavaScript的call和apply

在 JavaScript 中&#xff0c;.call() 和 .apply() 都是 Function 原型上的方法&#xff0c;用于改变函数执行时的上下文对象&#xff08;即 this 指向&#xff09;&#xff0c;它们的区别仅在于参数传递的形式不同。下面结合几个常见场景&#xff0c;说明它们的实际应用。 1. …...

DiT、 U-Net 与自回归模型的优势

DiT 相对于 U-Net 的优势 全局自注意力 vs. 局部卷积 U-Net 依赖卷积和池化/上采样来逐层扩大感受野&#xff0c;捕捉全局信息需要堆叠很多层或借助跳跃连接&#xff08;skip connections&#xff09;。DiT 在每个分辨率阶段都用 Transformer 模块&#xff08;多头自注意力 ML…...

开源 FcDesigner 表单设计器组件事件详解

FcDesigner 是一款基于Vue的开源低代码可视化表单设计器工具&#xff0c;通过数据驱动表单渲染。可以通过拖拽的方式快速创建表单&#xff0c;提高开发者对表单的开发效率&#xff0c;节省开发者的时间。并广泛应用于在政务系统、OA系统、ERP系统、电商系统、流程管理等领域。 …...

Teigha应用——解析CAD文件(DWG格式)Teigha在CAD C#二次开发中的基本应用

Teigha是一款专为开发者设计的工具&#xff0c;其核心技术在于强大的API和丰富的功能集&#xff0c;提供了一系列工具和方法&#xff0c;使开发者能够轻松地读取、解析和操作DWG文件。它支持多种操作系统&#xff0c;能在处理大型DWG文件时保持高效性能&#xff0c;还可用于构建…...

C++23内存分配新特性:std::allocate_at_least

文章目录 一、背景与动机二、std::allocator::allocate_at_least的特性三、std::allocate_at_least的自由函数版本四、实际应用场景1. 动态容器的优化2. 自定义分配器 五、总结 在C23标准中&#xff0c; std::allocate_at_least和 std::allocator::allocate_at_least的引入为…...

JavaScript性能优化全景指南

JavaScript性能优化全景指南 Ⅰ. 加载性能优化 1.1 代码分割与懒加载 动态导入(ES2020) javascript // 路由级代码分割 const ProductPage () > import(/* webpackChunkName: "product" */ ./ProductPage.vue); // 交互驱动加载 document.querySelector(#char…...

04-jenkins学习之旅-java后端项目部署实践

1、创建被管理项目 2、构建流程说明 jenkins其实就是将服务部署拆分成了&#xff1a; 1、拉取代码(git) 2、打包编译 3、自定义脚本(jar复制、执行启动脚本) 4、部署成功后的一些通知等 3、demo配置 3.1、General 3.2 源码管理 添加用户名密码方式如下图 3.2.1 常见错误(r…...

基于Python flask 的豆瓣电影top250数据评分可视化

文章目录 基于Python flask 的豆瓣电影top250数据评分可视化项目简介项目结构效果展示源码获取 基于Python flask 的豆瓣电影top250数据评分可视化 博主介绍&#xff1a;✌安替-AnTi&#xff1a;CSDN博客专家、掘金/华为云//InfoQ等平台优质作者&#xff0c;硕士研究生毕业。专…...

Cat.4+WiFi6工业路由器介绍小体积大作用ER4200

ER42004G Cat.4WiFi6 工业路由器隶属于纵横智控ER系列&#xff0c;型号为ER4200&#xff0c;是一款坚固耐用、性能强大的网络设备&#xff0c;专为应对严苛环境而设计。它采用工业级品质设计&#xff0c;集成 4G Cat.4 全网络支持和 WiFi6 技术&#xff0c;可在稳定性和性能至关…...

大模型应用开发第三讲:大模型是Agent的“大脑”,提供通用推理能力(如GPT-4、Claude 3)

大模型应用开发第三讲&#xff1a;大模型是Agent的“大脑”&#xff0c;提供通用推理能力&#xff08;如GPT-4、Claude 3&#xff09; 资料取自《大模型应用开发&#xff1a;动手做AI Agent 》。 查看总目录&#xff1a;学习大纲 关于DeepSeek本地部署指南可以看下我之前写的…...

创建型模式之Abstract Factory(抽象工厂)

创建型模式之Abstract Factory&#xff08;抽象工厂&#xff09; 摘要&#xff1a; 本文介绍了抽象工厂模式&#xff08;Abstract Factory&#xff09;&#xff0c;它是一种创建型设计模式&#xff0c;提供了一种创建一系列相关对象的接口而无需指定具体类。文章通过手机工厂示…...

GitLab 18.0 正式发布,15.0 将不再受技术支持,须升级【一】

GitLab 是一个全球知名的一体化 DevOps 平台&#xff0c;很多人都通过私有化部署 GitLab 来进行源代码托管。极狐GitLab 是 GitLab 在中国的发行版&#xff0c;专门为中国程序员服务。可以一键式部署极狐GitLab。 学习极狐GitLab 的相关资料&#xff1a; 极狐GitLab 官网极狐…...

【DeepSeek论文精读】12. DeepSeek-Prover-V2: 通过强化学习实现子目标分解的形式化数学推理

欢迎关注[【AIGC论文精读】](https://blog.csdn.net/youcans/category_12321605.html&#xff09;原创作品 【DeepSeek论文精读】1. 从 DeepSeek LLM 到 DeepSeek R1 【DeepSeek论文精读】10. DeepSeek-Coder-V2: 突破闭源模型在代码智能领域的障碍 【DeepSeek论文精读】12. De…...

字符串day7

344 反转字符串 字符串理论上也是一个数组&#xff0c;因此只需要用双指针即可 class Solution { public:void reverseString(vector<char>& s) {for(int i0,js.size()-1;i<j;i,j--){swap(s[i],s[j]);}} };541 反转字符串 自己实现一个反转从start到end的字符串…...

vue2中,codemirror编辑器的使用

交互说明 在编辑器中输入{时&#xff0c;会自动弹出选项弹窗&#xff0c;然后可以选值插入。 代码 父组件 <variable-editorv-model"content":variables"variables"placeholder"请输入模板内容..."blur"handleBlur" />data…...

FastAPI与MongoDB分片集群:异步数据路由与聚合优化

title: FastAPI与MongoDB分片集群:异步数据路由与聚合优化 date: 2025/05/26 16:04:31 updated: 2025/05/26 16:04:31 author: cmdragon excerpt: FastAPI与MongoDB分片集群集成实战探讨了分片集群的核心概念、Motor驱动配置技巧、分片数据路由策略、聚合管道高级应用、分片…...

Perl单元测试实战指南:从Test::Class入门到精通的完整方案

阅读原文 前言:为什么Perl开发者需要重视单元测试? "这段代码昨天还能运行,今天就出问题了!"——这可能是每位Perl开发者都经历过的噩梦。在没有充分测试覆盖的情况下,即使是微小的改动也可能导致系统崩溃。单元测试正是解决这一痛点的最佳实践,它能帮助我们在…...

强大的免费工具,集合了30+功能

今天给大家分享一款免费的绿色办公软件&#xff0c;它涵盖了自动任务、系统工具、文件工具、PDF 工具、OCR 图文识别、文字处理、电子表格这七个模块&#xff0c;多达 30 余项实用功能&#xff0c;堪称办公利器。 作者开发这款软件的初衷是为了解决日常办公中常见的痛点问题&am…...

从0开始学习R语言--Day11--主成分分析

主成分分析&#xff08;PCA&#xff09; PCA是一种降维技术&#xff0c;它把一堆相关的变量&#xff08;比如身高、体重、年龄&#xff09;转换成少数几个不相关的新变量&#xff08;叫“主成分”&#xff09;&#xff0c;这些新变量能最大程度保留原始数据的信息。 核心理念 …...

通用前端框架项目静态部署到Hugging Face Space的实践指南

背景介绍 在轻量级展示前端项目的场景中,Hugging Face Space 提供了一个便捷的静态托管平台。需求是将无后端服务的Vite的 Vue项目部署到Hugging Face Space 上。其实无论是基于Vite的Vue/React项目,还是使用Webpack构建的工程化方案,都可以通过两种方式将其部署到Space:自…...

AI辅助写作 从提笔难到高效创作的智能升级

你是否经历过面对空白文档头脑空白的绝望&#xff1f;是否为整理实验数据通宵达旦&#xff1f;在这个信息爆炸的时代&#xff0c;一种新型写作方式正悄悄改变知识工作者的创作模式—AI辅助写作。这种技术既不像科幻作品里的自动生成机器人&#xff0c;也非简单的文字模板&#…...

十一、Samba文件共享服务

目录 1、Samba介绍1.1、Samba概述1.2、Samba服务器的主要组成部分1.3、Samba的工作原理2、Samab服务器的安装与配置2.1、安装samba2.2、Samba主配置文件2.2.1、全局设置段[global]2.2.2、用户目录段[homes]2.2.3、配置文件检查工具3、示例3.1、需要用户验证的共享3.2、用户映射…...

医疗影像检测系统设计与实现

以下是一个基于YOLO系列模型的医疗影像检测系统实现及对比分析的详细技术文档。由于目前官方YOLOv11尚未发布,本文将基于YOLOv8架构设计改进型YOLOv11,并与YOLOv8、YOLOv5进行对比实验。全文包含完整代码实现及分析,字数超过6000字。 # 注意:本文代码需要Python 3.8+、PyT…...

11.13 LangGraph记忆机制解析:构建生产级AI Agent的关键技术

LangGraph 持久化与记忆:构建具备记忆能力的生产级 AI Agent 关键词:LangGraph 持久化, 多回合记忆, 单回合记忆, 检查点系统, 状态管理 1. 记忆机制的核心价值 在对话式 AI Agent 的开发中,记忆管理直接决定了用户体验的连贯性和智能性。LangGraph 通过 多回合记忆(Mult…...

C++23中std::span和std::basic_string_view可平凡复制提案解析

文章目录 一、引言二、相关概念解释2.1 平凡复制&#xff08;Trivially Copyable&#xff09;2.2 std::span2.3 std::basic_string_view 三、std::span和std::basic_string_view的应用场景3.1 std::span的应用场景3.2 std::basic_string_view的应用场景 四、P2251R1提案对std::…...

[yolov11改进系列]基于yolov11引入感受野注意力卷积RFAConv的python源码+训练源码

[RFAConv介绍] 1、RFAConv 在传统卷积操作中&#xff0c;每个感受野都使用相同的卷积核参数&#xff0c;无法区分不同位置的信息差异&#xff0c;这都限制了网络性能。此外&#xff0c;由于空间注意力以及现有空间注意力机制的局限性&#xff0c;虽然能够突出关键特征&#xf…...

Springboot引入Spring Cloud for AWS的配置中心(Parameter Store和Secrets)

问题 现在手上有一个老Spring2.5.15项目&#xff0c;需要使用AWS Parameter Store作为配置中心服务。 思路 引入这个Spring版本对应的Spring Cloud&#xff0c;然后再引入Spring Cloud AWS相关组件。然后&#xff0c;在AWS云上面准备好配置&#xff0c;然后&#xff0c;启动…...

打破云平台壁垒支持多层级JSON生成的MQTT网关技术解析

工业智能网关的上行通信以MQTT协议为核心&#xff0c;但在实际应用中&#xff0c;企业往往需要将数据同时或分场景接入多个公有云平台&#xff08;如华为云IoT、阿里云IoT、亚马逊AWS IoT&#xff09;&#xff0c;甚至私有化部署的第三方平台。为实现这一目标&#xff0c;网关需…...