当前位置: 首页 > news >正文

Springboot 集成kafka 消费者实现ssl方式连接监听消息实现消费

证书准备:springboot集成kafka 消费者实现 如何配置是ssl方式连接的时候需要进行证书的转换。原始的证书是pem, 或者csr方式 和key方式的时候需要转换,因为kafka里面是jks 需要通过openssl进行转换。

证书处理:

  • KeyStore 用于存储客户端的证书和私钥,用于客户端身份验证。
  • TrustStore 用于存储受信任的根证书或证书链,用于验证服务器的身份。

合并一下证书:

cat your_cert.pem your_key.key > test.pem

  1. 合并证书和私钥为一个 PKCS12 文件:
cat your_cert.pem your_key.key > combined.pem
openssl pkcs12 -export -in combined.pem -out client.p12 -name your_alias

2,将 PKCS12 文件导入到 Java KeyStore 中:

keytool -importkeystore -srckeystore client.p12 -srcstoretype PKCS12 -destkeystore client.jks -deststoretype JKS

要生成 truststore.jks 文件,您需要导入服务器的根证书或者服务器的证书链。这样,您的客户端应用程序就可以验证与服务器建立的 SSL 连接。

下面是生成 truststore.jks 的步骤:

  1. 获取服务器的根证书或证书链。您可以使用之前提到的 openssl s_client 命令来获取证书链。openssl s_client -connect 你的连接域名 -showcerts

  2. 将根证书或证书链保存为 .pem 文件。

  3. 使用 keytool 命令将根证书或证书链导入到 truststore.jks 文件中:

    keytool -importcert -file your_root_cert.pem -alias root_alias -keystore truststore.jks

 

项目集成:

maven集成:

  <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.5.5.RELEASE</version></dependency>

nacos配置:

spring:kafka:bootstrap-servers: SSL://connectedca.com:443  ##换成你自己的连接ssl:protocol: TLS
###3这三个密码是你证书配置的时候设置的密码trust-store-password: a123456key-store-password: a123456key-password: a123456consumer:group-id: producer:topic: *.event  ##换成你自己的topic

核心配置:


import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;import java.util.HashMap;
import java.util.Map;
@Slf4j
@Configuration
public class KafkaConfiguration {@AutowiredC3ConfigProperties c3ConfigProperties;@Autowiredprivate KafkaConfig kafkaProperties;@Autowiredprivate ResourceLoader resourceLoader;@Beanpublic KafkaAdmin kafkaAdmin() {Map <String, Object> configs = new HashMap <>();configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());return new KafkaAdmin(configs);}@Beanpublic DefaultKafkaConsumerFactory <String, String> consumerFactory() {Map <String, Object> consumerConfig = new HashMap <>();consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "newbie-car-owner-data-sync");consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, "newbie-car-owner-data-sync");consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);// 设置值的反序列化器为 ErrorHandlingDeserializer2,并配置类型信息consumerConfig.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);consumerConfig.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false); // 启用类型信息头consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");consumerConfig.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "*.KafkaC3MsgListener"); // 设置默认类型信息consumerConfig.put(JsonDeserializer.TRUSTED_PACKAGES, "*.KafkaC3MsgListener"); // 替换为你的实际包名String pemUrl = "";String csrUrl = "";if (c3ConfigProperties.getEnvironment().equals("uat")) {pemUrl = "file/uat/kafka/client.jks";csrUrl = "file/uat/kafka/truststore.jks";} else if (c3ConfigProperties.getEnvironment().equals("pre")) {pemUrl = "file/pre/kafka/client.jks";csrUrl = "file/pre/kafka/truststore.jks";} else if (c3ConfigProperties.getEnvironment().equals("prod")) {pemUrl = "file/prod/kafka/client.jks";csrUrl = "file/prod/kafka/truststore.jks";}try {// 获取证书资源 容器部署一定要用这种方式读取文件,要不然会报错,或者使用挂载Resource pemResource = resourceLoader.getResource("classpath:"+pemUrl);Resource csrResource = resourceLoader.getResource("classpath:"+csrUrl);
// 获取证书文件的路径String keyStorePath = pemResource.getFile().getAbsolutePath();String trustStorePath = csrResource.getFile().getAbsolutePath();consumerConfig.put("ssl.keystore.location", keyStorePath);consumerConfig.put("ssl.truststore.location", trustStorePath);}catch (Exception e){log.error("Resource file error:{}",e.getMessage());}consumerConfig.put("security.protocol", "SSL");consumerConfig.put("ssl.truststore.password", kafkaProperties.getTrustStorePassword());consumerConfig.put("ssl.keystore.password", kafkaProperties.getKeyStorePassword());consumerConfig.put("ssl.key.password", kafkaProperties.getKeyPassword());return new DefaultKafkaConsumerFactory <>(consumerConfig);}@Beanpublic ConcurrentKafkaListenerContainerFactory <String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory <String, String> factory = new ConcurrentKafkaListenerContainerFactory <>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3); // 设置并发消费者数量factory.setErrorHandler(new SeekToCurrentErrorHandler()); // 错误处理器return factory;}@Beanpublic KafkaC3MsgListener kafkaC3MsgListener() {return new KafkaC3MsgListener();}}

注入配置:


import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;@Data
@Configuration
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Value("${spring.kafka.producer.topic}")private String topic;@Value("${spring.kafka.ssl.trust-store-password}")private String trustStorePassword;@Value("${spring.kafka.ssl.key-store-password}")private String keyStorePassword;@Value("${spring.kafka.ssl.key-password}")private String keyPassword;}

能够看到这个配置就成功了表示:

然后在监听处理消息即可

 ————没有与生俱来的天赋,都是后天的努力拼搏(我是小杨,谢谢你的关注和支持)

相关文章:

Springboot 集成kafka 消费者实现ssl方式连接监听消息实现消费

证书准备&#xff1a;springboot集成kafka 消费者实现 如何配置是ssl方式连接的时候需要进行证书的转换。原始的证书是pem, 或者csr方式 和key方式的时候需要转换&#xff0c;因为kafka里面是jks 需要通过openssl进行转换。 证书处理&#xff1a; KeyStore 用于存储客户端的证…...

spark 实验二 RDD编程初级实践

目录 一. pyspark交互式编程示例&#xff08;学生选课成绩统计&#xff09; 该系总共有多少学生&#xff1b; 该系DataBase课程共有多少人选修&#xff1b; 各门课程的平均分是多少&#xff1b; 使用累加器计算共有多少人选了DataBase这门课。 二.编写独立应用程序实现数…...

【MySQL】not in遇上null的坑

今天遇到一个问题&#xff1a; 1、当 in 内的字段包含 null 的时候&#xff0c;正常过滤&#xff1b; 2、当 not in 内的字段包含 null 的时候&#xff0c;不能正常过滤&#xff0c;即使满足条件&#xff0c;最终结果也为 空。 测试如下&#xff1a; select * from emp e;当…...

鸿蒙4.0-DevEco Studio界面工程

DevEco Studio界面工程 DevEco Studio 下载与第一个工程新建的第一个工程界面回到Project工程结构来看 DevEco Studio 下载与第一个工程 DevEco Studio 下载地址&#xff1a;点击跳转 https://developer.harmonyos.com/cn/develop/deveco-studio#download 学习课堂以及文档地址…...

前端将html导出pdf文件解决分页问题

这是借鉴了qq_251025116大佬的解决方案并优化升级完成的&#xff0c;原文链接 1.安装依赖 npm install jspdf html2canvas2.使用方法 import htmlToPdffrom ./index.jsconst suc () > {message.success(success);};//记得在需要打印的div上面添加 idlet dom document.que…...

openssl3.2 - exp - 产生随机数

文章目录 openssl3.2 - exp - 产生随机数概述笔记END openssl3.2 - exp - 产生随机数 概述 要用到openssl产生的随机数, 查了资料. 如果用命令行产生随机数, 如下: openssl rand -hex -num 6 48bfd3a64f54单步跟进去, 看到主要就是调用了一个RAND_bytes(), 没其他了. 官方说…...

【三两波折】char *foo[]和char(*foo)[]有何不同?

1、先谈优先级 最高级别 —— 有四个&#xff0c;他们并不像运算符&#xff1a; []数组下标左到右结合()用于&#xff08;表达式&#xff09; or 函数名(形参表)左到右结合.读取结构体成员左到右结合->读取结构体成员&#xff08;通过指针&#xff09;左到右结合 第二级别…...

k8s(kubernetes)怎么查看pod服务对应哪些docker容器

Kubernetes&#xff08;k8s&#xff09;中的Pod是一组共享网络和存储资源的容器集合。每个Pod都包含一个或多个Docker容器&#xff0c;这些容器共享网络命名空间和存储卷&#xff0c;并在同一主机上运行。因此&#xff0c;可以将Pod视为一组紧密相关的Docker容器的逻辑主机&…...

[2023年]-hadoop面试真题(二)

[2023年]-hadoop面试真题(一) &#xff08;北京&#xff09; Maptask的个数由什么决定?&#xff08;北京&#xff09; 如何判定一个job的map和reduce的数量 ?&#xff08;北京&#xff09; MR中Shuffle过程 ?&#xff08;北京&#xff09; MR中处理数据流程 ?&#xff08;…...

蓝桥杯备战刷题-滑动窗口

今天给大家带来的是滑动窗口的类型题&#xff0c;都是十分经典的。 1&#xff0c;无重复字符的最长子串 看例三&#xff0c;我们顺便来说一下子串和子序列的含义 子串是从字符串里面抽出来的一部分&#xff0c;不可以有间隔&#xff0c;顺序也不能打乱。 子序列也是从字符串里…...

LLM(十一)| Claude 3:Anthropic发布最新超越GPT-4大模型

2024年3月4日&#xff0c;Anthropic发布最新多模态大模型&#xff1a;Claude 3系列&#xff0c;共有Haiku、Sonnet和Opus三个版本。 Opus在研究生水平专家推理、基础数学、本科水平专家知识、代码等10个维度&#xff0c;超过OpenAI的GPT-4。 Haiku模型更注重效率&#xff0c;能…...

20-Java备忘录模式 ( Memento Pattern )

Java备忘录模式 摘要实现范例 备忘录模式&#xff08;Memento Pattern&#xff09;保存一个对象的某个状态&#xff0c;以便在适当的时候恢复对象 备忘录模式属于行为型模式 摘要 1. 意图 在不破坏封装性的前提下&#xff0c;捕获一个对象的内部状态&#xff0c;并在该对…...

整合生成型AI战略:从宏观思维到小步实践

“整合生成型AI战略&#xff1a;从宏观思维到小步实践” 在这篇文章中&#xff0c;我们探讨了将生成型AI和大型语言模型融入企业核心业务的战略开发方法。我们的方法基于敏捷开发原则&#xff0c;技术专家和数据科学家需要采纳商业思维&#xff0c;而执行官则需理解生成型AI和…...

个人博客系列-后端项目-用户验证(5)

介绍 创建系统管理app&#xff0c;用于管理系统的用户&#xff0c;角色&#xff0c;权限&#xff0c;登录等功能&#xff0c;项目中将使用django-rest_framework进行用户认证和权限解析。这里将完成用户认证 用户验证 rest_framework.authentication模块中的认证类&#xff…...

css3中nth-child属性作用及用法剖析

hello宝子们...我们是艾斯视觉擅长ui设计和前端开发10年经验!希望我的分享能帮助到您!如需帮助可以评论关注私信我们一起探讨!致敬感谢感恩! 标题&#xff1a;CSS3中nth-child属性作用及用法剖析 摘要&#xff1a;CSS3中的nth-child选择器允许我们根据元素位置来定位特定的元素…...

okHttp MediaType MIME格式详解

一、介绍 我们在做数据上传时&#xff0c;经常会用到Okhttp的开源库&#xff0c;okhttp开源库也遵循html提交的MIME数据格式。 所以我们经常会看到applicaiton/json这样的格式在传。 但是如果涉及到其他文件等就需要详细的数据格式&#xff0c;否则服务端无法解析 二、okHt…...

跨境电商三大趋势

跨境电商有着不断发展的三大趋势&#xff1a; 个性化定制&#xff1a;随着消费者需求的不断变化和个性化定制的潮流&#xff0c;跨境电商平台开始提供更多的定制化服务。消费者可以根据自己的需求选择产品的款式、材料和设计&#xff0c;从而获得更加个性化的产品体验。 无界销…...

【DevOps基础篇之k8s】如何通过Kubernetes CKA认证考试

【DevOps基础篇之k8s】如何通过Kubernetes CKA认证考试 目录 【DevOps基础篇之k8s】如何通过Kubernetes CKA认证考试核心概念资源监控生命周期管理Cluster维护安全认证问题排查其他推荐超级课程: Docker快速入门到精通Kubernetes入门到大师通关课这些是我在准备CK...

Mysql数据库-基本表操作

1.表操作 创建表&#xff1a;CREATE TABLE table_name ( field1 datatype, field2 datatype, field3 datatype ) character set 字符集 collate 校验规则 engine 存储引擎; field 表示列名 datatype 表示列的类型 character set 字符集&#xff0c;如果没有指定字符集&#xff…...

OceanBase社区版单节点安装搭建(Docker)

OceanBase社区版单节点安装搭建&#xff08;Docker&#xff09; 文章目录 OceanBase社区版单节点安装搭建&#xff08;Docker&#xff09;一、环境检查及Docker配置1.1 安装docker1.2 配置docker镜像源 二、OB镜像下载三、obd部署单节点数据库四、创建业务租户、数据库、表4.1 …...

ComfyUI Impact Pack 安装后报错排查指南:从依赖缺失到解决方案

1. 遇到ComfyUI Impact Pack报错怎么办&#xff1f; 最近有不少朋友反馈&#xff0c;明明已经安装了ComfyUI Impact Pack插件&#xff0c;但运行时还是会出现"节点未找到"的报错提示。这种情况我遇到过好几次&#xff0c;刚开始也是一头雾水&#xff0c;后来慢慢摸索…...

从概念到应用:一文读懂概率密度函数与累积分布函数的联系与区别

1. 随机变量&#xff1a;理解概率分布的基础 概率密度函数&#xff08;PDF&#xff09;和累积分布函数&#xff08;CDF&#xff09;是统计学中描述随机变量分布的两个核心工具。要真正理解它们&#xff0c;我们得从随机变量这个基础概念说起。随机变量就像是一个数学魔术师&am…...

别再只调参了!用进化算法给DDPG当“外挂”,解决强化学习探索难题(附PyTorch代码)

进化算法与DDPG的协同进化&#xff1a;突破强化学习探索瓶颈的工程实践 在机器人控制、游戏AI等需要连续动作决策的场景中&#xff0c;深度确定性策略梯度算法&#xff08;DDPG&#xff09;因其出色的表现而广受欢迎。然而&#xff0c;许多工程师在实际项目中都会遇到这样的困…...

终极指南:如何用Bioicons免费矢量图标库快速制作专业科研图表

终极指南&#xff1a;如何用Bioicons免费矢量图标库快速制作专业科研图表 【免费下载链接】bioicons A library of free open source icons for science illustrations in biology and chemistry 项目地址: https://gitcode.com/gh_mirrors/bi/bioicons Bioicons是一个免…...

利用TIGRAMITE进行时间序列因果分析:从数据准备到可视化全流程

1. TIGRAMITE入门&#xff1a;时间序列因果分析利器 第一次接触TIGRAMITE是在分析气象数据时&#xff0c;当时需要找出温度、湿度、风速之间的因果关系链。这个Python包让我眼前一亮——它不仅能自动识别变量间的因果方向&#xff0c;还能精确捕捉时间滞后效应。TIGRAMITE基于…...

3分钟掌握艾尔登法环存档迁移:EldenRingSaveCopier终极指南

3分钟掌握艾尔登法环存档迁移&#xff1a;EldenRingSaveCopier终极指南 【免费下载链接】EldenRingSaveCopier 项目地址: https://gitcode.com/gh_mirrors/el/EldenRingSaveCopier 艾尔登法环存档管理是每位褪色者必须掌握的技能。面对存档损坏、设备更换或多角色管理的…...

如何用一款开源工具永久保存200+小说网站的内容?

如何用一款开源工具永久保存200小说网站的内容&#xff1f; 【免费下载链接】novel-downloader 一个可扩展的通用型小说下载器。 项目地址: https://gitcode.com/gh_mirrors/no/novel-downloader 在数字阅读时代&#xff0c;最令人不安的体验莫过于某天打开收藏夹&#…...

Spark单机模式入门:从安装到实战案例,一步步教你如何用Python玩转大数据处理

Spark单机模式实战指南&#xff1a;Python大数据处理从入门到精通 大数据处理已成为现代技术生态中不可或缺的一环&#xff0c;而Spark作为其中的佼佼者&#xff0c;以其卓越的性能和易用性赢得了广泛认可。对于Python开发者而言&#xff0c;Spark的单机模式提供了一个绝佳的起…...

掌握RDKit化学信息学工具:从分子计算到药物发现的完整实战指南

掌握RDKit化学信息学工具&#xff1a;从分子计算到药物发现的完整实战指南 【免费下载链接】rdkit The official sources for the RDKit library 项目地址: https://gitcode.com/gh_mirrors/rd/rdkit RDKit作为现代化学信息学的核心工具包&#xff0c;为化学家、药物研发…...

免费音频编辑神器Audacity:从零基础到专业级的完整指南

免费音频编辑神器Audacity&#xff1a;从零基础到专业级的完整指南 【免费下载链接】audacity Audio Editor 项目地址: https://gitcode.com/GitHub_Trending/au/audacity 在数字内容创作成为主流的今天&#xff0c;音频质量直接影响着作品的专业度和传播效果。然而&am…...