Springboot 集成kafka 消费者实现ssl方式连接监听消息实现消费
证书准备:springboot集成kafka 消费者实现 如何配置是ssl方式连接的时候需要进行证书的转换。原始的证书是pem, 或者csr方式 和key方式的时候需要转换,因为kafka里面是jks 需要通过openssl进行转换。
证书处理:
- KeyStore 用于存储客户端的证书和私钥,用于客户端身份验证。
- TrustStore 用于存储受信任的根证书或证书链,用于验证服务器的身份。
合并一下证书:
cat your_cert.pem your_key.key > test.pem
- 合并证书和私钥为一个 PKCS12 文件:
cat your_cert.pem your_key.key > combined.pem
openssl pkcs12 -export -in combined.pem -out client.p12 -name your_alias
2,将 PKCS12 文件导入到 Java KeyStore 中:
keytool -importkeystore -srckeystore client.p12 -srcstoretype PKCS12 -destkeystore client.jks -deststoretype JKS
要生成 truststore.jks
文件,您需要导入服务器的根证书或者服务器的证书链。这样,您的客户端应用程序就可以验证与服务器建立的 SSL 连接。
下面是生成 truststore.jks
的步骤:
-
获取服务器的根证书或证书链。您可以使用之前提到的
openssl s_client
命令来获取证书链。openssl s_client -connect 你的连接域名 -showcerts -
将根证书或证书链保存为
.pem
文件。 -
使用
keytool
命令将根证书或证书链导入到truststore.jks
文件中:keytool -importcert -file your_root_cert.pem -alias root_alias -keystore truststore.jks
项目集成:
maven集成:
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.5.5.RELEASE</version></dependency>
nacos配置:
spring:kafka:bootstrap-servers: SSL://connectedca.com:443 ##换成你自己的连接ssl:protocol: TLS
###3这三个密码是你证书配置的时候设置的密码trust-store-password: a123456key-store-password: a123456key-password: a123456consumer:group-id: producer:topic: *.event ##换成你自己的topic
核心配置:
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;import java.util.HashMap;
import java.util.Map;
@Slf4j
@Configuration
public class KafkaConfiguration {@AutowiredC3ConfigProperties c3ConfigProperties;@Autowiredprivate KafkaConfig kafkaProperties;@Autowiredprivate ResourceLoader resourceLoader;@Beanpublic KafkaAdmin kafkaAdmin() {Map <String, Object> configs = new HashMap <>();configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());return new KafkaAdmin(configs);}@Beanpublic DefaultKafkaConsumerFactory <String, String> consumerFactory() {Map <String, Object> consumerConfig = new HashMap <>();consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "newbie-car-owner-data-sync");consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, "newbie-car-owner-data-sync");consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);// 设置值的反序列化器为 ErrorHandlingDeserializer2,并配置类型信息consumerConfig.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);consumerConfig.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false); // 启用类型信息头consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");consumerConfig.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "*.KafkaC3MsgListener"); // 设置默认类型信息consumerConfig.put(JsonDeserializer.TRUSTED_PACKAGES, "*.KafkaC3MsgListener"); // 替换为你的实际包名String pemUrl = "";String csrUrl = "";if (c3ConfigProperties.getEnvironment().equals("uat")) {pemUrl = "file/uat/kafka/client.jks";csrUrl = "file/uat/kafka/truststore.jks";} else if (c3ConfigProperties.getEnvironment().equals("pre")) {pemUrl = "file/pre/kafka/client.jks";csrUrl = "file/pre/kafka/truststore.jks";} else if (c3ConfigProperties.getEnvironment().equals("prod")) {pemUrl = "file/prod/kafka/client.jks";csrUrl = "file/prod/kafka/truststore.jks";}try {// 获取证书资源 容器部署一定要用这种方式读取文件,要不然会报错,或者使用挂载Resource pemResource = resourceLoader.getResource("classpath:"+pemUrl);Resource csrResource = resourceLoader.getResource("classpath:"+csrUrl);
// 获取证书文件的路径String keyStorePath = pemResource.getFile().getAbsolutePath();String trustStorePath = csrResource.getFile().getAbsolutePath();consumerConfig.put("ssl.keystore.location", keyStorePath);consumerConfig.put("ssl.truststore.location", trustStorePath);}catch (Exception e){log.error("Resource file error:{}",e.getMessage());}consumerConfig.put("security.protocol", "SSL");consumerConfig.put("ssl.truststore.password", kafkaProperties.getTrustStorePassword());consumerConfig.put("ssl.keystore.password", kafkaProperties.getKeyStorePassword());consumerConfig.put("ssl.key.password", kafkaProperties.getKeyPassword());return new DefaultKafkaConsumerFactory <>(consumerConfig);}@Beanpublic ConcurrentKafkaListenerContainerFactory <String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory <String, String> factory = new ConcurrentKafkaListenerContainerFactory <>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3); // 设置并发消费者数量factory.setErrorHandler(new SeekToCurrentErrorHandler()); // 错误处理器return factory;}@Beanpublic KafkaC3MsgListener kafkaC3MsgListener() {return new KafkaC3MsgListener();}}
注入配置:
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;@Data
@Configuration
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Value("${spring.kafka.producer.topic}")private String topic;@Value("${spring.kafka.ssl.trust-store-password}")private String trustStorePassword;@Value("${spring.kafka.ssl.key-store-password}")private String keyStorePassword;@Value("${spring.kafka.ssl.key-password}")private String keyPassword;}
能够看到这个配置就成功了表示:
然后在监听处理消息即可
————没有与生俱来的天赋,都是后天的努力拼搏(我是小杨,谢谢你的关注和支持)
相关文章:

Springboot 集成kafka 消费者实现ssl方式连接监听消息实现消费
证书准备:springboot集成kafka 消费者实现 如何配置是ssl方式连接的时候需要进行证书的转换。原始的证书是pem, 或者csr方式 和key方式的时候需要转换,因为kafka里面是jks 需要通过openssl进行转换。 证书处理: KeyStore 用于存储客户端的证…...

spark 实验二 RDD编程初级实践
目录 一. pyspark交互式编程示例(学生选课成绩统计) 该系总共有多少学生; 该系DataBase课程共有多少人选修; 各门课程的平均分是多少; 使用累加器计算共有多少人选了DataBase这门课。 二.编写独立应用程序实现数…...

【MySQL】not in遇上null的坑
今天遇到一个问题: 1、当 in 内的字段包含 null 的时候,正常过滤; 2、当 not in 内的字段包含 null 的时候,不能正常过滤,即使满足条件,最终结果也为 空。 测试如下: select * from emp e;当…...

鸿蒙4.0-DevEco Studio界面工程
DevEco Studio界面工程 DevEco Studio 下载与第一个工程新建的第一个工程界面回到Project工程结构来看 DevEco Studio 下载与第一个工程 DevEco Studio 下载地址:点击跳转 https://developer.harmonyos.com/cn/develop/deveco-studio#download 学习课堂以及文档地址…...

前端将html导出pdf文件解决分页问题
这是借鉴了qq_251025116大佬的解决方案并优化升级完成的,原文链接 1.安装依赖 npm install jspdf html2canvas2.使用方法 import htmlToPdffrom ./index.jsconst suc () > {message.success(success);};//记得在需要打印的div上面添加 idlet dom document.que…...

openssl3.2 - exp - 产生随机数
文章目录 openssl3.2 - exp - 产生随机数概述笔记END openssl3.2 - exp - 产生随机数 概述 要用到openssl产生的随机数, 查了资料. 如果用命令行产生随机数, 如下: openssl rand -hex -num 6 48bfd3a64f54单步跟进去, 看到主要就是调用了一个RAND_bytes(), 没其他了. 官方说…...
【三两波折】char *foo[]和char(*foo)[]有何不同?
1、先谈优先级 最高级别 —— 有四个,他们并不像运算符: []数组下标左到右结合()用于(表达式) or 函数名(形参表)左到右结合.读取结构体成员左到右结合->读取结构体成员(通过指针)左到右结合 第二级别…...
k8s(kubernetes)怎么查看pod服务对应哪些docker容器
Kubernetes(k8s)中的Pod是一组共享网络和存储资源的容器集合。每个Pod都包含一个或多个Docker容器,这些容器共享网络命名空间和存储卷,并在同一主机上运行。因此,可以将Pod视为一组紧密相关的Docker容器的逻辑主机&…...
[2023年]-hadoop面试真题(二)
[2023年]-hadoop面试真题(一) (北京) Maptask的个数由什么决定?(北京) 如何判定一个job的map和reduce的数量 ?(北京) MR中Shuffle过程 ?(北京) MR中处理数据流程 ?(…...

蓝桥杯备战刷题-滑动窗口
今天给大家带来的是滑动窗口的类型题,都是十分经典的。 1,无重复字符的最长子串 看例三,我们顺便来说一下子串和子序列的含义 子串是从字符串里面抽出来的一部分,不可以有间隔,顺序也不能打乱。 子序列也是从字符串里…...

LLM(十一)| Claude 3:Anthropic发布最新超越GPT-4大模型
2024年3月4日,Anthropic发布最新多模态大模型:Claude 3系列,共有Haiku、Sonnet和Opus三个版本。 Opus在研究生水平专家推理、基础数学、本科水平专家知识、代码等10个维度,超过OpenAI的GPT-4。 Haiku模型更注重效率,能…...

20-Java备忘录模式 ( Memento Pattern )
Java备忘录模式 摘要实现范例 备忘录模式(Memento Pattern)保存一个对象的某个状态,以便在适当的时候恢复对象 备忘录模式属于行为型模式 摘要 1. 意图 在不破坏封装性的前提下,捕获一个对象的内部状态,并在该对…...
整合生成型AI战略:从宏观思维到小步实践
“整合生成型AI战略:从宏观思维到小步实践” 在这篇文章中,我们探讨了将生成型AI和大型语言模型融入企业核心业务的战略开发方法。我们的方法基于敏捷开发原则,技术专家和数据科学家需要采纳商业思维,而执行官则需理解生成型AI和…...

个人博客系列-后端项目-用户验证(5)
介绍 创建系统管理app,用于管理系统的用户,角色,权限,登录等功能,项目中将使用django-rest_framework进行用户认证和权限解析。这里将完成用户认证 用户验证 rest_framework.authentication模块中的认证类ÿ…...

css3中nth-child属性作用及用法剖析
hello宝子们...我们是艾斯视觉擅长ui设计和前端开发10年经验!希望我的分享能帮助到您!如需帮助可以评论关注私信我们一起探讨!致敬感谢感恩! 标题:CSS3中nth-child属性作用及用法剖析 摘要:CSS3中的nth-child选择器允许我们根据元素位置来定位特定的元素…...

okHttp MediaType MIME格式详解
一、介绍 我们在做数据上传时,经常会用到Okhttp的开源库,okhttp开源库也遵循html提交的MIME数据格式。 所以我们经常会看到applicaiton/json这样的格式在传。 但是如果涉及到其他文件等就需要详细的数据格式,否则服务端无法解析 二、okHt…...

跨境电商三大趋势
跨境电商有着不断发展的三大趋势: 个性化定制:随着消费者需求的不断变化和个性化定制的潮流,跨境电商平台开始提供更多的定制化服务。消费者可以根据自己的需求选择产品的款式、材料和设计,从而获得更加个性化的产品体验。 无界销…...

【DevOps基础篇之k8s】如何通过Kubernetes CKA认证考试
【DevOps基础篇之k8s】如何通过Kubernetes CKA认证考试 目录 【DevOps基础篇之k8s】如何通过Kubernetes CKA认证考试核心概念资源监控生命周期管理Cluster维护安全认证问题排查其他推荐超级课程: Docker快速入门到精通Kubernetes入门到大师通关课这些是我在准备CK...

Mysql数据库-基本表操作
1.表操作 创建表:CREATE TABLE table_name ( field1 datatype, field2 datatype, field3 datatype ) character set 字符集 collate 校验规则 engine 存储引擎; field 表示列名 datatype 表示列的类型 character set 字符集,如果没有指定字符集ÿ…...
OceanBase社区版单节点安装搭建(Docker)
OceanBase社区版单节点安装搭建(Docker) 文章目录 OceanBase社区版单节点安装搭建(Docker)一、环境检查及Docker配置1.1 安装docker1.2 配置docker镜像源 二、OB镜像下载三、obd部署单节点数据库四、创建业务租户、数据库、表4.1 …...
uniapp 对接腾讯云IM群组成员管理(增删改查)
UniApp 实战:腾讯云IM群组成员管理(增删改查) 一、前言 在社交类App开发中,群组成员管理是核心功能之一。本文将基于UniApp框架,结合腾讯云IM SDK,详细讲解如何实现群组成员的增删改查全流程。 权限校验…...
[2025CVPR]DeepVideo-R1:基于难度感知回归GRPO的视频强化微调框架详解
突破视频大语言模型推理瓶颈,在多个视频基准上实现SOTA性能 一、核心问题与创新亮点 1.1 GRPO在视频任务中的两大挑战 安全措施依赖问题 GRPO使用min和clip函数限制策略更新幅度,导致: 梯度抑制:当新旧策略差异过大时梯度消失收敛困难:策略无法充分优化# 传统GRPO的梯…...

如何在看板中体现优先级变化
在看板中有效体现优先级变化的关键措施包括:采用颜色或标签标识优先级、设置任务排序规则、使用独立的优先级列或泳道、结合自动化规则同步优先级变化、建立定期的优先级审查流程。其中,设置任务排序规则尤其重要,因为它让看板视觉上直观地体…...
系统设计 --- MongoDB亿级数据查询优化策略
系统设计 --- MongoDB亿级数据查询分表策略 背景Solution --- 分表 背景 使用audit log实现Audi Trail功能 Audit Trail范围: 六个月数据量: 每秒5-7条audi log,共计7千万 – 1亿条数据需要实现全文检索按照时间倒序因为license问题,不能使用ELK只能使用…...

STM32标准库-DMA直接存储器存取
文章目录 一、DMA1.1简介1.2存储器映像1.3DMA框图1.4DMA基本结构1.5DMA请求1.6数据宽度与对齐1.7数据转运DMA1.8ADC扫描模式DMA 二、数据转运DMA2.1接线图2.2代码2.3相关API 一、DMA 1.1简介 DMA(Direct Memory Access)直接存储器存取 DMA可以提供外设…...
oracle与MySQL数据库之间数据同步的技术要点
Oracle与MySQL数据库之间的数据同步是一个涉及多个技术要点的复杂任务。由于Oracle和MySQL的架构差异,它们的数据同步要求既要保持数据的准确性和一致性,又要处理好性能问题。以下是一些主要的技术要点: 数据结构差异 数据类型差异ÿ…...
【C语言练习】080. 使用C语言实现简单的数据库操作
080. 使用C语言实现简单的数据库操作 080. 使用C语言实现简单的数据库操作使用原生APIODBC接口第三方库ORM框架文件模拟1. 安装SQLite2. 示例代码:使用SQLite创建数据库、表和插入数据3. 编译和运行4. 示例运行输出:5. 注意事项6. 总结080. 使用C语言实现简单的数据库操作 在…...

让AI看见世界:MCP协议与服务器的工作原理
让AI看见世界:MCP协议与服务器的工作原理 MCP(Model Context Protocol)是一种创新的通信协议,旨在让大型语言模型能够安全、高效地与外部资源进行交互。在AI技术快速发展的今天,MCP正成为连接AI与现实世界的重要桥梁。…...
爬虫基础学习day2
# 爬虫设计领域 工商:企查查、天眼查短视频:抖音、快手、西瓜 ---> 飞瓜电商:京东、淘宝、聚美优品、亚马逊 ---> 分析店铺经营决策标题、排名航空:抓取所有航空公司价格 ---> 去哪儿自媒体:采集自媒体数据进…...
【HTTP三个基础问题】
面试官您好!HTTP是超文本传输协议,是互联网上客户端和服务器之间传输超文本数据(比如文字、图片、音频、视频等)的核心协议,当前互联网应用最广泛的版本是HTTP1.1,它基于经典的C/S模型,也就是客…...