Springboot 集成kafka 消费者实现ssl方式连接监听消息实现消费
证书准备:springboot集成kafka 消费者实现 如何配置是ssl方式连接的时候需要进行证书的转换。原始的证书是pem, 或者csr方式 和key方式的时候需要转换,因为kafka里面是jks 需要通过openssl进行转换。
证书处理:
- KeyStore 用于存储客户端的证书和私钥,用于客户端身份验证。
- TrustStore 用于存储受信任的根证书或证书链,用于验证服务器的身份。

合并一下证书:
cat your_cert.pem your_key.key > test.pem

- 合并证书和私钥为一个 PKCS12 文件:
cat your_cert.pem your_key.key > combined.pem
openssl pkcs12 -export -in combined.pem -out client.p12 -name your_alias
2,将 PKCS12 文件导入到 Java KeyStore 中:
keytool -importkeystore -srckeystore client.p12 -srcstoretype PKCS12 -destkeystore client.jks -deststoretype JKS
要生成 truststore.jks 文件,您需要导入服务器的根证书或者服务器的证书链。这样,您的客户端应用程序就可以验证与服务器建立的 SSL 连接。
下面是生成 truststore.jks 的步骤:
-
获取服务器的根证书或证书链。您可以使用之前提到的
openssl s_client命令来获取证书链。openssl s_client -connect 你的连接域名 -showcerts
-
将根证书或证书链保存为
.pem文件。 -
使用
keytool命令将根证书或证书链导入到truststore.jks文件中:keytool -importcert -file your_root_cert.pem -alias root_alias -keystore truststore.jks
项目集成:
maven集成:
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.5.5.RELEASE</version></dependency>
nacos配置:
spring:kafka:bootstrap-servers: SSL://connectedca.com:443 ##换成你自己的连接ssl:protocol: TLS
###3这三个密码是你证书配置的时候设置的密码trust-store-password: a123456key-store-password: a123456key-password: a123456consumer:group-id: producer:topic: *.event ##换成你自己的topic
核心配置:
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;import java.util.HashMap;
import java.util.Map;
@Slf4j
@Configuration
public class KafkaConfiguration {@AutowiredC3ConfigProperties c3ConfigProperties;@Autowiredprivate KafkaConfig kafkaProperties;@Autowiredprivate ResourceLoader resourceLoader;@Beanpublic KafkaAdmin kafkaAdmin() {Map <String, Object> configs = new HashMap <>();configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());return new KafkaAdmin(configs);}@Beanpublic DefaultKafkaConsumerFactory <String, String> consumerFactory() {Map <String, Object> consumerConfig = new HashMap <>();consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "newbie-car-owner-data-sync");consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, "newbie-car-owner-data-sync");consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);// 设置值的反序列化器为 ErrorHandlingDeserializer2,并配置类型信息consumerConfig.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);consumerConfig.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false); // 启用类型信息头consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");consumerConfig.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "*.KafkaC3MsgListener"); // 设置默认类型信息consumerConfig.put(JsonDeserializer.TRUSTED_PACKAGES, "*.KafkaC3MsgListener"); // 替换为你的实际包名String pemUrl = "";String csrUrl = "";if (c3ConfigProperties.getEnvironment().equals("uat")) {pemUrl = "file/uat/kafka/client.jks";csrUrl = "file/uat/kafka/truststore.jks";} else if (c3ConfigProperties.getEnvironment().equals("pre")) {pemUrl = "file/pre/kafka/client.jks";csrUrl = "file/pre/kafka/truststore.jks";} else if (c3ConfigProperties.getEnvironment().equals("prod")) {pemUrl = "file/prod/kafka/client.jks";csrUrl = "file/prod/kafka/truststore.jks";}try {// 获取证书资源 容器部署一定要用这种方式读取文件,要不然会报错,或者使用挂载Resource pemResource = resourceLoader.getResource("classpath:"+pemUrl);Resource csrResource = resourceLoader.getResource("classpath:"+csrUrl);
// 获取证书文件的路径String keyStorePath = pemResource.getFile().getAbsolutePath();String trustStorePath = csrResource.getFile().getAbsolutePath();consumerConfig.put("ssl.keystore.location", keyStorePath);consumerConfig.put("ssl.truststore.location", trustStorePath);}catch (Exception e){log.error("Resource file error:{}",e.getMessage());}consumerConfig.put("security.protocol", "SSL");consumerConfig.put("ssl.truststore.password", kafkaProperties.getTrustStorePassword());consumerConfig.put("ssl.keystore.password", kafkaProperties.getKeyStorePassword());consumerConfig.put("ssl.key.password", kafkaProperties.getKeyPassword());return new DefaultKafkaConsumerFactory <>(consumerConfig);}@Beanpublic ConcurrentKafkaListenerContainerFactory <String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory <String, String> factory = new ConcurrentKafkaListenerContainerFactory <>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3); // 设置并发消费者数量factory.setErrorHandler(new SeekToCurrentErrorHandler()); // 错误处理器return factory;}@Beanpublic KafkaC3MsgListener kafkaC3MsgListener() {return new KafkaC3MsgListener();}}
注入配置:
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;@Data
@Configuration
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Value("${spring.kafka.producer.topic}")private String topic;@Value("${spring.kafka.ssl.trust-store-password}")private String trustStorePassword;@Value("${spring.kafka.ssl.key-store-password}")private String keyStorePassword;@Value("${spring.kafka.ssl.key-password}")private String keyPassword;}
能够看到这个配置就成功了表示:

然后在监听处理消息即可
————没有与生俱来的天赋,都是后天的努力拼搏(我是小杨,谢谢你的关注和支持)
相关文章:
Springboot 集成kafka 消费者实现ssl方式连接监听消息实现消费
证书准备:springboot集成kafka 消费者实现 如何配置是ssl方式连接的时候需要进行证书的转换。原始的证书是pem, 或者csr方式 和key方式的时候需要转换,因为kafka里面是jks 需要通过openssl进行转换。 证书处理: KeyStore 用于存储客户端的证…...
spark 实验二 RDD编程初级实践
目录 一. pyspark交互式编程示例(学生选课成绩统计) 该系总共有多少学生; 该系DataBase课程共有多少人选修; 各门课程的平均分是多少; 使用累加器计算共有多少人选了DataBase这门课。 二.编写独立应用程序实现数…...
【MySQL】not in遇上null的坑
今天遇到一个问题: 1、当 in 内的字段包含 null 的时候,正常过滤; 2、当 not in 内的字段包含 null 的时候,不能正常过滤,即使满足条件,最终结果也为 空。 测试如下: select * from emp e;当…...
鸿蒙4.0-DevEco Studio界面工程
DevEco Studio界面工程 DevEco Studio 下载与第一个工程新建的第一个工程界面回到Project工程结构来看 DevEco Studio 下载与第一个工程 DevEco Studio 下载地址:点击跳转 https://developer.harmonyos.com/cn/develop/deveco-studio#download 学习课堂以及文档地址…...
前端将html导出pdf文件解决分页问题
这是借鉴了qq_251025116大佬的解决方案并优化升级完成的,原文链接 1.安装依赖 npm install jspdf html2canvas2.使用方法 import htmlToPdffrom ./index.jsconst suc () > {message.success(success);};//记得在需要打印的div上面添加 idlet dom document.que…...
openssl3.2 - exp - 产生随机数
文章目录 openssl3.2 - exp - 产生随机数概述笔记END openssl3.2 - exp - 产生随机数 概述 要用到openssl产生的随机数, 查了资料. 如果用命令行产生随机数, 如下: openssl rand -hex -num 6 48bfd3a64f54单步跟进去, 看到主要就是调用了一个RAND_bytes(), 没其他了. 官方说…...
【三两波折】char *foo[]和char(*foo)[]有何不同?
1、先谈优先级 最高级别 —— 有四个,他们并不像运算符: []数组下标左到右结合()用于(表达式) or 函数名(形参表)左到右结合.读取结构体成员左到右结合->读取结构体成员(通过指针)左到右结合 第二级别…...
k8s(kubernetes)怎么查看pod服务对应哪些docker容器
Kubernetes(k8s)中的Pod是一组共享网络和存储资源的容器集合。每个Pod都包含一个或多个Docker容器,这些容器共享网络命名空间和存储卷,并在同一主机上运行。因此,可以将Pod视为一组紧密相关的Docker容器的逻辑主机&…...
[2023年]-hadoop面试真题(二)
[2023年]-hadoop面试真题(一) (北京) Maptask的个数由什么决定?(北京) 如何判定一个job的map和reduce的数量 ?(北京) MR中Shuffle过程 ?(北京) MR中处理数据流程 ?(…...
蓝桥杯备战刷题-滑动窗口
今天给大家带来的是滑动窗口的类型题,都是十分经典的。 1,无重复字符的最长子串 看例三,我们顺便来说一下子串和子序列的含义 子串是从字符串里面抽出来的一部分,不可以有间隔,顺序也不能打乱。 子序列也是从字符串里…...
LLM(十一)| Claude 3:Anthropic发布最新超越GPT-4大模型
2024年3月4日,Anthropic发布最新多模态大模型:Claude 3系列,共有Haiku、Sonnet和Opus三个版本。 Opus在研究生水平专家推理、基础数学、本科水平专家知识、代码等10个维度,超过OpenAI的GPT-4。 Haiku模型更注重效率,能…...
20-Java备忘录模式 ( Memento Pattern )
Java备忘录模式 摘要实现范例 备忘录模式(Memento Pattern)保存一个对象的某个状态,以便在适当的时候恢复对象 备忘录模式属于行为型模式 摘要 1. 意图 在不破坏封装性的前提下,捕获一个对象的内部状态,并在该对…...
整合生成型AI战略:从宏观思维到小步实践
“整合生成型AI战略:从宏观思维到小步实践” 在这篇文章中,我们探讨了将生成型AI和大型语言模型融入企业核心业务的战略开发方法。我们的方法基于敏捷开发原则,技术专家和数据科学家需要采纳商业思维,而执行官则需理解生成型AI和…...
个人博客系列-后端项目-用户验证(5)
介绍 创建系统管理app,用于管理系统的用户,角色,权限,登录等功能,项目中将使用django-rest_framework进行用户认证和权限解析。这里将完成用户认证 用户验证 rest_framework.authentication模块中的认证类ÿ…...
css3中nth-child属性作用及用法剖析
hello宝子们...我们是艾斯视觉擅长ui设计和前端开发10年经验!希望我的分享能帮助到您!如需帮助可以评论关注私信我们一起探讨!致敬感谢感恩! 标题:CSS3中nth-child属性作用及用法剖析 摘要:CSS3中的nth-child选择器允许我们根据元素位置来定位特定的元素…...
okHttp MediaType MIME格式详解
一、介绍 我们在做数据上传时,经常会用到Okhttp的开源库,okhttp开源库也遵循html提交的MIME数据格式。 所以我们经常会看到applicaiton/json这样的格式在传。 但是如果涉及到其他文件等就需要详细的数据格式,否则服务端无法解析 二、okHt…...
跨境电商三大趋势
跨境电商有着不断发展的三大趋势: 个性化定制:随着消费者需求的不断变化和个性化定制的潮流,跨境电商平台开始提供更多的定制化服务。消费者可以根据自己的需求选择产品的款式、材料和设计,从而获得更加个性化的产品体验。 无界销…...
【DevOps基础篇之k8s】如何通过Kubernetes CKA认证考试
【DevOps基础篇之k8s】如何通过Kubernetes CKA认证考试 目录 【DevOps基础篇之k8s】如何通过Kubernetes CKA认证考试核心概念资源监控生命周期管理Cluster维护安全认证问题排查其他推荐超级课程: Docker快速入门到精通Kubernetes入门到大师通关课这些是我在准备CK...
Mysql数据库-基本表操作
1.表操作 创建表:CREATE TABLE table_name ( field1 datatype, field2 datatype, field3 datatype ) character set 字符集 collate 校验规则 engine 存储引擎; field 表示列名 datatype 表示列的类型 character set 字符集,如果没有指定字符集ÿ…...
OceanBase社区版单节点安装搭建(Docker)
OceanBase社区版单节点安装搭建(Docker) 文章目录 OceanBase社区版单节点安装搭建(Docker)一、环境检查及Docker配置1.1 安装docker1.2 配置docker镜像源 二、OB镜像下载三、obd部署单节点数据库四、创建业务租户、数据库、表4.1 …...
ComfyUI Impact Pack 安装后报错排查指南:从依赖缺失到解决方案
1. 遇到ComfyUI Impact Pack报错怎么办? 最近有不少朋友反馈,明明已经安装了ComfyUI Impact Pack插件,但运行时还是会出现"节点未找到"的报错提示。这种情况我遇到过好几次,刚开始也是一头雾水,后来慢慢摸索…...
从概念到应用:一文读懂概率密度函数与累积分布函数的联系与区别
1. 随机变量:理解概率分布的基础 概率密度函数(PDF)和累积分布函数(CDF)是统计学中描述随机变量分布的两个核心工具。要真正理解它们,我们得从随机变量这个基础概念说起。随机变量就像是一个数学魔术师&am…...
别再只调参了!用进化算法给DDPG当“外挂”,解决强化学习探索难题(附PyTorch代码)
进化算法与DDPG的协同进化:突破强化学习探索瓶颈的工程实践 在机器人控制、游戏AI等需要连续动作决策的场景中,深度确定性策略梯度算法(DDPG)因其出色的表现而广受欢迎。然而,许多工程师在实际项目中都会遇到这样的困…...
终极指南:如何用Bioicons免费矢量图标库快速制作专业科研图表
终极指南:如何用Bioicons免费矢量图标库快速制作专业科研图表 【免费下载链接】bioicons A library of free open source icons for science illustrations in biology and chemistry 项目地址: https://gitcode.com/gh_mirrors/bi/bioicons Bioicons是一个免…...
利用TIGRAMITE进行时间序列因果分析:从数据准备到可视化全流程
1. TIGRAMITE入门:时间序列因果分析利器 第一次接触TIGRAMITE是在分析气象数据时,当时需要找出温度、湿度、风速之间的因果关系链。这个Python包让我眼前一亮——它不仅能自动识别变量间的因果方向,还能精确捕捉时间滞后效应。TIGRAMITE基于…...
3分钟掌握艾尔登法环存档迁移:EldenRingSaveCopier终极指南
3分钟掌握艾尔登法环存档迁移:EldenRingSaveCopier终极指南 【免费下载链接】EldenRingSaveCopier 项目地址: https://gitcode.com/gh_mirrors/el/EldenRingSaveCopier 艾尔登法环存档管理是每位褪色者必须掌握的技能。面对存档损坏、设备更换或多角色管理的…...
如何用一款开源工具永久保存200+小说网站的内容?
如何用一款开源工具永久保存200小说网站的内容? 【免费下载链接】novel-downloader 一个可扩展的通用型小说下载器。 项目地址: https://gitcode.com/gh_mirrors/no/novel-downloader 在数字阅读时代,最令人不安的体验莫过于某天打开收藏夹&#…...
Spark单机模式入门:从安装到实战案例,一步步教你如何用Python玩转大数据处理
Spark单机模式实战指南:Python大数据处理从入门到精通 大数据处理已成为现代技术生态中不可或缺的一环,而Spark作为其中的佼佼者,以其卓越的性能和易用性赢得了广泛认可。对于Python开发者而言,Spark的单机模式提供了一个绝佳的起…...
掌握RDKit化学信息学工具:从分子计算到药物发现的完整实战指南
掌握RDKit化学信息学工具:从分子计算到药物发现的完整实战指南 【免费下载链接】rdkit The official sources for the RDKit library 项目地址: https://gitcode.com/gh_mirrors/rd/rdkit RDKit作为现代化学信息学的核心工具包,为化学家、药物研发…...
免费音频编辑神器Audacity:从零基础到专业级的完整指南
免费音频编辑神器Audacity:从零基础到专业级的完整指南 【免费下载链接】audacity Audio Editor 项目地址: https://gitcode.com/GitHub_Trending/au/audacity 在数字内容创作成为主流的今天,音频质量直接影响着作品的专业度和传播效果。然而&am…...
