Springboot 集成kafka 消费者实现ssl方式连接监听消息实现消费
证书准备:springboot集成kafka 消费者实现 如何配置是ssl方式连接的时候需要进行证书的转换。原始的证书是pem, 或者csr方式 和key方式的时候需要转换,因为kafka里面是jks 需要通过openssl进行转换。
证书处理:
- KeyStore 用于存储客户端的证书和私钥,用于客户端身份验证。
- TrustStore 用于存储受信任的根证书或证书链,用于验证服务器的身份。

合并一下证书:
cat your_cert.pem your_key.key > test.pem

- 合并证书和私钥为一个 PKCS12 文件:
cat your_cert.pem your_key.key > combined.pem
openssl pkcs12 -export -in combined.pem -out client.p12 -name your_alias
2,将 PKCS12 文件导入到 Java KeyStore 中:
keytool -importkeystore -srckeystore client.p12 -srcstoretype PKCS12 -destkeystore client.jks -deststoretype JKS
要生成 truststore.jks 文件,您需要导入服务器的根证书或者服务器的证书链。这样,您的客户端应用程序就可以验证与服务器建立的 SSL 连接。
下面是生成 truststore.jks 的步骤:
-
获取服务器的根证书或证书链。您可以使用之前提到的
openssl s_client命令来获取证书链。openssl s_client -connect 你的连接域名 -showcerts
-
将根证书或证书链保存为
.pem文件。 -
使用
keytool命令将根证书或证书链导入到truststore.jks文件中:keytool -importcert -file your_root_cert.pem -alias root_alias -keystore truststore.jks
项目集成:
maven集成:
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.5.5.RELEASE</version></dependency>
nacos配置:
spring:kafka:bootstrap-servers: SSL://connectedca.com:443 ##换成你自己的连接ssl:protocol: TLS
###3这三个密码是你证书配置的时候设置的密码trust-store-password: a123456key-store-password: a123456key-password: a123456consumer:group-id: producer:topic: *.event ##换成你自己的topic
核心配置:
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;import java.util.HashMap;
import java.util.Map;
@Slf4j
@Configuration
public class KafkaConfiguration {@AutowiredC3ConfigProperties c3ConfigProperties;@Autowiredprivate KafkaConfig kafkaProperties;@Autowiredprivate ResourceLoader resourceLoader;@Beanpublic KafkaAdmin kafkaAdmin() {Map <String, Object> configs = new HashMap <>();configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());return new KafkaAdmin(configs);}@Beanpublic DefaultKafkaConsumerFactory <String, String> consumerFactory() {Map <String, Object> consumerConfig = new HashMap <>();consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "newbie-car-owner-data-sync");consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, "newbie-car-owner-data-sync");consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);// 设置值的反序列化器为 ErrorHandlingDeserializer2,并配置类型信息consumerConfig.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);consumerConfig.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false); // 启用类型信息头consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");consumerConfig.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "*.KafkaC3MsgListener"); // 设置默认类型信息consumerConfig.put(JsonDeserializer.TRUSTED_PACKAGES, "*.KafkaC3MsgListener"); // 替换为你的实际包名String pemUrl = "";String csrUrl = "";if (c3ConfigProperties.getEnvironment().equals("uat")) {pemUrl = "file/uat/kafka/client.jks";csrUrl = "file/uat/kafka/truststore.jks";} else if (c3ConfigProperties.getEnvironment().equals("pre")) {pemUrl = "file/pre/kafka/client.jks";csrUrl = "file/pre/kafka/truststore.jks";} else if (c3ConfigProperties.getEnvironment().equals("prod")) {pemUrl = "file/prod/kafka/client.jks";csrUrl = "file/prod/kafka/truststore.jks";}try {// 获取证书资源 容器部署一定要用这种方式读取文件,要不然会报错,或者使用挂载Resource pemResource = resourceLoader.getResource("classpath:"+pemUrl);Resource csrResource = resourceLoader.getResource("classpath:"+csrUrl);
// 获取证书文件的路径String keyStorePath = pemResource.getFile().getAbsolutePath();String trustStorePath = csrResource.getFile().getAbsolutePath();consumerConfig.put("ssl.keystore.location", keyStorePath);consumerConfig.put("ssl.truststore.location", trustStorePath);}catch (Exception e){log.error("Resource file error:{}",e.getMessage());}consumerConfig.put("security.protocol", "SSL");consumerConfig.put("ssl.truststore.password", kafkaProperties.getTrustStorePassword());consumerConfig.put("ssl.keystore.password", kafkaProperties.getKeyStorePassword());consumerConfig.put("ssl.key.password", kafkaProperties.getKeyPassword());return new DefaultKafkaConsumerFactory <>(consumerConfig);}@Beanpublic ConcurrentKafkaListenerContainerFactory <String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory <String, String> factory = new ConcurrentKafkaListenerContainerFactory <>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3); // 设置并发消费者数量factory.setErrorHandler(new SeekToCurrentErrorHandler()); // 错误处理器return factory;}@Beanpublic KafkaC3MsgListener kafkaC3MsgListener() {return new KafkaC3MsgListener();}}
注入配置:
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;@Data
@Configuration
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Value("${spring.kafka.producer.topic}")private String topic;@Value("${spring.kafka.ssl.trust-store-password}")private String trustStorePassword;@Value("${spring.kafka.ssl.key-store-password}")private String keyStorePassword;@Value("${spring.kafka.ssl.key-password}")private String keyPassword;}
能够看到这个配置就成功了表示:

然后在监听处理消息即可
————没有与生俱来的天赋,都是后天的努力拼搏(我是小杨,谢谢你的关注和支持)
相关文章:
Springboot 集成kafka 消费者实现ssl方式连接监听消息实现消费
证书准备:springboot集成kafka 消费者实现 如何配置是ssl方式连接的时候需要进行证书的转换。原始的证书是pem, 或者csr方式 和key方式的时候需要转换,因为kafka里面是jks 需要通过openssl进行转换。 证书处理: KeyStore 用于存储客户端的证…...
spark 实验二 RDD编程初级实践
目录 一. pyspark交互式编程示例(学生选课成绩统计) 该系总共有多少学生; 该系DataBase课程共有多少人选修; 各门课程的平均分是多少; 使用累加器计算共有多少人选了DataBase这门课。 二.编写独立应用程序实现数…...
【MySQL】not in遇上null的坑
今天遇到一个问题: 1、当 in 内的字段包含 null 的时候,正常过滤; 2、当 not in 内的字段包含 null 的时候,不能正常过滤,即使满足条件,最终结果也为 空。 测试如下: select * from emp e;当…...
鸿蒙4.0-DevEco Studio界面工程
DevEco Studio界面工程 DevEco Studio 下载与第一个工程新建的第一个工程界面回到Project工程结构来看 DevEco Studio 下载与第一个工程 DevEco Studio 下载地址:点击跳转 https://developer.harmonyos.com/cn/develop/deveco-studio#download 学习课堂以及文档地址…...
前端将html导出pdf文件解决分页问题
这是借鉴了qq_251025116大佬的解决方案并优化升级完成的,原文链接 1.安装依赖 npm install jspdf html2canvas2.使用方法 import htmlToPdffrom ./index.jsconst suc () > {message.success(success);};//记得在需要打印的div上面添加 idlet dom document.que…...
openssl3.2 - exp - 产生随机数
文章目录 openssl3.2 - exp - 产生随机数概述笔记END openssl3.2 - exp - 产生随机数 概述 要用到openssl产生的随机数, 查了资料. 如果用命令行产生随机数, 如下: openssl rand -hex -num 6 48bfd3a64f54单步跟进去, 看到主要就是调用了一个RAND_bytes(), 没其他了. 官方说…...
【三两波折】char *foo[]和char(*foo)[]有何不同?
1、先谈优先级 最高级别 —— 有四个,他们并不像运算符: []数组下标左到右结合()用于(表达式) or 函数名(形参表)左到右结合.读取结构体成员左到右结合->读取结构体成员(通过指针)左到右结合 第二级别…...
k8s(kubernetes)怎么查看pod服务对应哪些docker容器
Kubernetes(k8s)中的Pod是一组共享网络和存储资源的容器集合。每个Pod都包含一个或多个Docker容器,这些容器共享网络命名空间和存储卷,并在同一主机上运行。因此,可以将Pod视为一组紧密相关的Docker容器的逻辑主机&…...
[2023年]-hadoop面试真题(二)
[2023年]-hadoop面试真题(一) (北京) Maptask的个数由什么决定?(北京) 如何判定一个job的map和reduce的数量 ?(北京) MR中Shuffle过程 ?(北京) MR中处理数据流程 ?(…...
蓝桥杯备战刷题-滑动窗口
今天给大家带来的是滑动窗口的类型题,都是十分经典的。 1,无重复字符的最长子串 看例三,我们顺便来说一下子串和子序列的含义 子串是从字符串里面抽出来的一部分,不可以有间隔,顺序也不能打乱。 子序列也是从字符串里…...
LLM(十一)| Claude 3:Anthropic发布最新超越GPT-4大模型
2024年3月4日,Anthropic发布最新多模态大模型:Claude 3系列,共有Haiku、Sonnet和Opus三个版本。 Opus在研究生水平专家推理、基础数学、本科水平专家知识、代码等10个维度,超过OpenAI的GPT-4。 Haiku模型更注重效率,能…...
20-Java备忘录模式 ( Memento Pattern )
Java备忘录模式 摘要实现范例 备忘录模式(Memento Pattern)保存一个对象的某个状态,以便在适当的时候恢复对象 备忘录模式属于行为型模式 摘要 1. 意图 在不破坏封装性的前提下,捕获一个对象的内部状态,并在该对…...
整合生成型AI战略:从宏观思维到小步实践
“整合生成型AI战略:从宏观思维到小步实践” 在这篇文章中,我们探讨了将生成型AI和大型语言模型融入企业核心业务的战略开发方法。我们的方法基于敏捷开发原则,技术专家和数据科学家需要采纳商业思维,而执行官则需理解生成型AI和…...
个人博客系列-后端项目-用户验证(5)
介绍 创建系统管理app,用于管理系统的用户,角色,权限,登录等功能,项目中将使用django-rest_framework进行用户认证和权限解析。这里将完成用户认证 用户验证 rest_framework.authentication模块中的认证类ÿ…...
css3中nth-child属性作用及用法剖析
hello宝子们...我们是艾斯视觉擅长ui设计和前端开发10年经验!希望我的分享能帮助到您!如需帮助可以评论关注私信我们一起探讨!致敬感谢感恩! 标题:CSS3中nth-child属性作用及用法剖析 摘要:CSS3中的nth-child选择器允许我们根据元素位置来定位特定的元素…...
okHttp MediaType MIME格式详解
一、介绍 我们在做数据上传时,经常会用到Okhttp的开源库,okhttp开源库也遵循html提交的MIME数据格式。 所以我们经常会看到applicaiton/json这样的格式在传。 但是如果涉及到其他文件等就需要详细的数据格式,否则服务端无法解析 二、okHt…...
跨境电商三大趋势
跨境电商有着不断发展的三大趋势: 个性化定制:随着消费者需求的不断变化和个性化定制的潮流,跨境电商平台开始提供更多的定制化服务。消费者可以根据自己的需求选择产品的款式、材料和设计,从而获得更加个性化的产品体验。 无界销…...
【DevOps基础篇之k8s】如何通过Kubernetes CKA认证考试
【DevOps基础篇之k8s】如何通过Kubernetes CKA认证考试 目录 【DevOps基础篇之k8s】如何通过Kubernetes CKA认证考试核心概念资源监控生命周期管理Cluster维护安全认证问题排查其他推荐超级课程: Docker快速入门到精通Kubernetes入门到大师通关课这些是我在准备CK...
Mysql数据库-基本表操作
1.表操作 创建表:CREATE TABLE table_name ( field1 datatype, field2 datatype, field3 datatype ) character set 字符集 collate 校验规则 engine 存储引擎; field 表示列名 datatype 表示列的类型 character set 字符集,如果没有指定字符集ÿ…...
OceanBase社区版单节点安装搭建(Docker)
OceanBase社区版单节点安装搭建(Docker) 文章目录 OceanBase社区版单节点安装搭建(Docker)一、环境检查及Docker配置1.1 安装docker1.2 配置docker镜像源 二、OB镜像下载三、obd部署单节点数据库四、创建业务租户、数据库、表4.1 …...
JavaSec-RCE
简介 RCE(Remote Code Execution),可以分为:命令注入(Command Injection)、代码注入(Code Injection) 代码注入 1.漏洞场景:Groovy代码注入 Groovy是一种基于JVM的动态语言,语法简洁,支持闭包、动态类型和Java互操作性,…...
React hook之useRef
React useRef 详解 useRef 是 React 提供的一个 Hook,用于在函数组件中创建可变的引用对象。它在 React 开发中有多种重要用途,下面我将全面详细地介绍它的特性和用法。 基本概念 1. 创建 ref const refContainer useRef(initialValue);initialValu…...
Swift 协议扩展精进之路:解决 CoreData 托管实体子类的类型不匹配问题(下)
概述 在 Swift 开发语言中,各位秃头小码农们可以充分利用语法本身所带来的便利去劈荆斩棘。我们还可以恣意利用泛型、协议关联类型和协议扩展来进一步简化和优化我们复杂的代码需求。 不过,在涉及到多个子类派生于基类进行多态模拟的场景下,…...
mysql已经安装,但是通过rpm -q 没有找mysql相关的已安装包
文章目录 现象:mysql已经安装,但是通过rpm -q 没有找mysql相关的已安装包遇到 rpm 命令找不到已经安装的 MySQL 包时,可能是因为以下几个原因:1.MySQL 不是通过 RPM 包安装的2.RPM 数据库损坏3.使用了不同的包名或路径4.使用其他包…...
Linux离线(zip方式)安装docker
目录 基础信息操作系统信息docker信息 安装实例安装步骤示例 遇到的问题问题1:修改默认工作路径启动失败问题2 找不到对应组 基础信息 操作系统信息 OS版本:CentOS 7 64位 内核版本:3.10.0 相关命令: uname -rcat /etc/os-rele…...
Chromium 136 编译指南 Windows篇:depot_tools 配置与源码获取(二)
引言 工欲善其事,必先利其器。在完成了 Visual Studio 2022 和 Windows SDK 的安装后,我们即将接触到 Chromium 开发生态中最核心的工具——depot_tools。这个由 Google 精心打造的工具集,就像是连接开发者与 Chromium 庞大代码库的智能桥梁…...
Linux系统部署KES
1、安装准备 1.版本说明V008R006C009B0014 V008:是version产品的大版本。 R006:是release产品特性版本。 C009:是通用版 B0014:是build开发过程中的构建版本2.硬件要求 #安全版和企业版 内存:1GB 以上 硬盘…...
git: early EOF
macOS报错: Initialized empty Git repository in /usr/local/Homebrew/Library/Taps/homebrew/homebrew-core/.git/ remote: Enumerating objects: 2691797, done. remote: Counting objects: 100% (1760/1760), done. remote: Compressing objects: 100% (636/636…...
redis和redission的区别
Redis 和 Redisson 是两个密切相关但又本质不同的技术,它们扮演着完全不同的角色: Redis: 内存数据库/数据结构存储 本质: 它是一个开源的、高性能的、基于内存的 键值存储数据库。它也可以将数据持久化到磁盘。 核心功能: 提供丰…...
「Java基本语法」变量的使用
变量定义 变量是程序中存储数据的容器,用于保存可变的数据值。在Java中,变量必须先声明后使用,声明时需指定变量的数据类型和变量名。 语法 数据类型 变量名 [ 初始值]; 示例:声明与初始化 public class VariableDemo {publi…...
