当前位置: 首页 > news >正文

Springboot 集成kafka 消费者实现ssl方式连接监听消息实现消费

证书准备:springboot集成kafka 消费者实现 如何配置是ssl方式连接的时候需要进行证书的转换。原始的证书是pem, 或者csr方式 和key方式的时候需要转换,因为kafka里面是jks 需要通过openssl进行转换。

证书处理:

  • KeyStore 用于存储客户端的证书和私钥,用于客户端身份验证。
  • TrustStore 用于存储受信任的根证书或证书链,用于验证服务器的身份。

合并一下证书:

cat your_cert.pem your_key.key > test.pem

  1. 合并证书和私钥为一个 PKCS12 文件:
cat your_cert.pem your_key.key > combined.pem
openssl pkcs12 -export -in combined.pem -out client.p12 -name your_alias

2,将 PKCS12 文件导入到 Java KeyStore 中:

keytool -importkeystore -srckeystore client.p12 -srcstoretype PKCS12 -destkeystore client.jks -deststoretype JKS

要生成 truststore.jks 文件,您需要导入服务器的根证书或者服务器的证书链。这样,您的客户端应用程序就可以验证与服务器建立的 SSL 连接。

下面是生成 truststore.jks 的步骤:

  1. 获取服务器的根证书或证书链。您可以使用之前提到的 openssl s_client 命令来获取证书链。openssl s_client -connect 你的连接域名 -showcerts

  2. 将根证书或证书链保存为 .pem 文件。

  3. 使用 keytool 命令将根证书或证书链导入到 truststore.jks 文件中:

    keytool -importcert -file your_root_cert.pem -alias root_alias -keystore truststore.jks

 

项目集成:

maven集成:

  <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.5.5.RELEASE</version></dependency>

nacos配置:

spring:kafka:bootstrap-servers: SSL://connectedca.com:443  ##换成你自己的连接ssl:protocol: TLS
###3这三个密码是你证书配置的时候设置的密码trust-store-password: a123456key-store-password: a123456key-password: a123456consumer:group-id: producer:topic: *.event  ##换成你自己的topic

核心配置:


import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
import org.springframework.kafka.support.serializer.JsonDeserializer;import java.util.HashMap;
import java.util.Map;
@Slf4j
@Configuration
public class KafkaConfiguration {@AutowiredC3ConfigProperties c3ConfigProperties;@Autowiredprivate KafkaConfig kafkaProperties;@Autowiredprivate ResourceLoader resourceLoader;@Beanpublic KafkaAdmin kafkaAdmin() {Map <String, Object> configs = new HashMap <>();configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());return new KafkaAdmin(configs);}@Beanpublic DefaultKafkaConsumerFactory <String, String> consumerFactory() {Map <String, Object> consumerConfig = new HashMap <>();consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "newbie-car-owner-data-sync");consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, "newbie-car-owner-data-sync");consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);// 设置值的反序列化器为 ErrorHandlingDeserializer2,并配置类型信息consumerConfig.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);consumerConfig.put(JsonDeserializer.USE_TYPE_INFO_HEADERS, false); // 启用类型信息头consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");consumerConfig.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "*.KafkaC3MsgListener"); // 设置默认类型信息consumerConfig.put(JsonDeserializer.TRUSTED_PACKAGES, "*.KafkaC3MsgListener"); // 替换为你的实际包名String pemUrl = "";String csrUrl = "";if (c3ConfigProperties.getEnvironment().equals("uat")) {pemUrl = "file/uat/kafka/client.jks";csrUrl = "file/uat/kafka/truststore.jks";} else if (c3ConfigProperties.getEnvironment().equals("pre")) {pemUrl = "file/pre/kafka/client.jks";csrUrl = "file/pre/kafka/truststore.jks";} else if (c3ConfigProperties.getEnvironment().equals("prod")) {pemUrl = "file/prod/kafka/client.jks";csrUrl = "file/prod/kafka/truststore.jks";}try {// 获取证书资源 容器部署一定要用这种方式读取文件,要不然会报错,或者使用挂载Resource pemResource = resourceLoader.getResource("classpath:"+pemUrl);Resource csrResource = resourceLoader.getResource("classpath:"+csrUrl);
// 获取证书文件的路径String keyStorePath = pemResource.getFile().getAbsolutePath();String trustStorePath = csrResource.getFile().getAbsolutePath();consumerConfig.put("ssl.keystore.location", keyStorePath);consumerConfig.put("ssl.truststore.location", trustStorePath);}catch (Exception e){log.error("Resource file error:{}",e.getMessage());}consumerConfig.put("security.protocol", "SSL");consumerConfig.put("ssl.truststore.password", kafkaProperties.getTrustStorePassword());consumerConfig.put("ssl.keystore.password", kafkaProperties.getKeyStorePassword());consumerConfig.put("ssl.key.password", kafkaProperties.getKeyPassword());return new DefaultKafkaConsumerFactory <>(consumerConfig);}@Beanpublic ConcurrentKafkaListenerContainerFactory <String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory <String, String> factory = new ConcurrentKafkaListenerContainerFactory <>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(3); // 设置并发消费者数量factory.setErrorHandler(new SeekToCurrentErrorHandler()); // 错误处理器return factory;}@Beanpublic KafkaC3MsgListener kafkaC3MsgListener() {return new KafkaC3MsgListener();}}

注入配置:


import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;@Data
@Configuration
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.group-id}")private String groupId;@Value("${spring.kafka.producer.topic}")private String topic;@Value("${spring.kafka.ssl.trust-store-password}")private String trustStorePassword;@Value("${spring.kafka.ssl.key-store-password}")private String keyStorePassword;@Value("${spring.kafka.ssl.key-password}")private String keyPassword;}

能够看到这个配置就成功了表示:

然后在监听处理消息即可

 ————没有与生俱来的天赋,都是后天的努力拼搏(我是小杨,谢谢你的关注和支持)

相关文章:

Springboot 集成kafka 消费者实现ssl方式连接监听消息实现消费

证书准备&#xff1a;springboot集成kafka 消费者实现 如何配置是ssl方式连接的时候需要进行证书的转换。原始的证书是pem, 或者csr方式 和key方式的时候需要转换&#xff0c;因为kafka里面是jks 需要通过openssl进行转换。 证书处理&#xff1a; KeyStore 用于存储客户端的证…...

spark 实验二 RDD编程初级实践

目录 一. pyspark交互式编程示例&#xff08;学生选课成绩统计&#xff09; 该系总共有多少学生&#xff1b; 该系DataBase课程共有多少人选修&#xff1b; 各门课程的平均分是多少&#xff1b; 使用累加器计算共有多少人选了DataBase这门课。 二.编写独立应用程序实现数…...

【MySQL】not in遇上null的坑

今天遇到一个问题&#xff1a; 1、当 in 内的字段包含 null 的时候&#xff0c;正常过滤&#xff1b; 2、当 not in 内的字段包含 null 的时候&#xff0c;不能正常过滤&#xff0c;即使满足条件&#xff0c;最终结果也为 空。 测试如下&#xff1a; select * from emp e;当…...

鸿蒙4.0-DevEco Studio界面工程

DevEco Studio界面工程 DevEco Studio 下载与第一个工程新建的第一个工程界面回到Project工程结构来看 DevEco Studio 下载与第一个工程 DevEco Studio 下载地址&#xff1a;点击跳转 https://developer.harmonyos.com/cn/develop/deveco-studio#download 学习课堂以及文档地址…...

前端将html导出pdf文件解决分页问题

这是借鉴了qq_251025116大佬的解决方案并优化升级完成的&#xff0c;原文链接 1.安装依赖 npm install jspdf html2canvas2.使用方法 import htmlToPdffrom ./index.jsconst suc () > {message.success(success);};//记得在需要打印的div上面添加 idlet dom document.que…...

openssl3.2 - exp - 产生随机数

文章目录 openssl3.2 - exp - 产生随机数概述笔记END openssl3.2 - exp - 产生随机数 概述 要用到openssl产生的随机数, 查了资料. 如果用命令行产生随机数, 如下: openssl rand -hex -num 6 48bfd3a64f54单步跟进去, 看到主要就是调用了一个RAND_bytes(), 没其他了. 官方说…...

【三两波折】char *foo[]和char(*foo)[]有何不同?

1、先谈优先级 最高级别 —— 有四个&#xff0c;他们并不像运算符&#xff1a; []数组下标左到右结合()用于&#xff08;表达式&#xff09; or 函数名(形参表)左到右结合.读取结构体成员左到右结合->读取结构体成员&#xff08;通过指针&#xff09;左到右结合 第二级别…...

k8s(kubernetes)怎么查看pod服务对应哪些docker容器

Kubernetes&#xff08;k8s&#xff09;中的Pod是一组共享网络和存储资源的容器集合。每个Pod都包含一个或多个Docker容器&#xff0c;这些容器共享网络命名空间和存储卷&#xff0c;并在同一主机上运行。因此&#xff0c;可以将Pod视为一组紧密相关的Docker容器的逻辑主机&…...

[2023年]-hadoop面试真题(二)

[2023年]-hadoop面试真题(一) &#xff08;北京&#xff09; Maptask的个数由什么决定?&#xff08;北京&#xff09; 如何判定一个job的map和reduce的数量 ?&#xff08;北京&#xff09; MR中Shuffle过程 ?&#xff08;北京&#xff09; MR中处理数据流程 ?&#xff08;…...

蓝桥杯备战刷题-滑动窗口

今天给大家带来的是滑动窗口的类型题&#xff0c;都是十分经典的。 1&#xff0c;无重复字符的最长子串 看例三&#xff0c;我们顺便来说一下子串和子序列的含义 子串是从字符串里面抽出来的一部分&#xff0c;不可以有间隔&#xff0c;顺序也不能打乱。 子序列也是从字符串里…...

LLM(十一)| Claude 3:Anthropic发布最新超越GPT-4大模型

2024年3月4日&#xff0c;Anthropic发布最新多模态大模型&#xff1a;Claude 3系列&#xff0c;共有Haiku、Sonnet和Opus三个版本。 Opus在研究生水平专家推理、基础数学、本科水平专家知识、代码等10个维度&#xff0c;超过OpenAI的GPT-4。 Haiku模型更注重效率&#xff0c;能…...

20-Java备忘录模式 ( Memento Pattern )

Java备忘录模式 摘要实现范例 备忘录模式&#xff08;Memento Pattern&#xff09;保存一个对象的某个状态&#xff0c;以便在适当的时候恢复对象 备忘录模式属于行为型模式 摘要 1. 意图 在不破坏封装性的前提下&#xff0c;捕获一个对象的内部状态&#xff0c;并在该对…...

整合生成型AI战略:从宏观思维到小步实践

“整合生成型AI战略&#xff1a;从宏观思维到小步实践” 在这篇文章中&#xff0c;我们探讨了将生成型AI和大型语言模型融入企业核心业务的战略开发方法。我们的方法基于敏捷开发原则&#xff0c;技术专家和数据科学家需要采纳商业思维&#xff0c;而执行官则需理解生成型AI和…...

个人博客系列-后端项目-用户验证(5)

介绍 创建系统管理app&#xff0c;用于管理系统的用户&#xff0c;角色&#xff0c;权限&#xff0c;登录等功能&#xff0c;项目中将使用django-rest_framework进行用户认证和权限解析。这里将完成用户认证 用户验证 rest_framework.authentication模块中的认证类&#xff…...

css3中nth-child属性作用及用法剖析

hello宝子们...我们是艾斯视觉擅长ui设计和前端开发10年经验!希望我的分享能帮助到您!如需帮助可以评论关注私信我们一起探讨!致敬感谢感恩! 标题&#xff1a;CSS3中nth-child属性作用及用法剖析 摘要&#xff1a;CSS3中的nth-child选择器允许我们根据元素位置来定位特定的元素…...

okHttp MediaType MIME格式详解

一、介绍 我们在做数据上传时&#xff0c;经常会用到Okhttp的开源库&#xff0c;okhttp开源库也遵循html提交的MIME数据格式。 所以我们经常会看到applicaiton/json这样的格式在传。 但是如果涉及到其他文件等就需要详细的数据格式&#xff0c;否则服务端无法解析 二、okHt…...

跨境电商三大趋势

跨境电商有着不断发展的三大趋势&#xff1a; 个性化定制&#xff1a;随着消费者需求的不断变化和个性化定制的潮流&#xff0c;跨境电商平台开始提供更多的定制化服务。消费者可以根据自己的需求选择产品的款式、材料和设计&#xff0c;从而获得更加个性化的产品体验。 无界销…...

【DevOps基础篇之k8s】如何通过Kubernetes CKA认证考试

【DevOps基础篇之k8s】如何通过Kubernetes CKA认证考试 目录 【DevOps基础篇之k8s】如何通过Kubernetes CKA认证考试核心概念资源监控生命周期管理Cluster维护安全认证问题排查其他推荐超级课程: Docker快速入门到精通Kubernetes入门到大师通关课这些是我在准备CK...

Mysql数据库-基本表操作

1.表操作 创建表&#xff1a;CREATE TABLE table_name ( field1 datatype, field2 datatype, field3 datatype ) character set 字符集 collate 校验规则 engine 存储引擎; field 表示列名 datatype 表示列的类型 character set 字符集&#xff0c;如果没有指定字符集&#xff…...

OceanBase社区版单节点安装搭建(Docker)

OceanBase社区版单节点安装搭建&#xff08;Docker&#xff09; 文章目录 OceanBase社区版单节点安装搭建&#xff08;Docker&#xff09;一、环境检查及Docker配置1.1 安装docker1.2 配置docker镜像源 二、OB镜像下载三、obd部署单节点数据库四、创建业务租户、数据库、表4.1 …...

相机Camera日志实例分析之二:相机Camx【专业模式开启直方图拍照】单帧流程日志详解

【关注我&#xff0c;后续持续新增专题博文&#xff0c;谢谢&#xff01;&#xff01;&#xff01;】 上一篇我们讲了&#xff1a; 这一篇我们开始讲&#xff1a; 目录 一、场景操作步骤 二、日志基础关键字分级如下 三、场景日志如下&#xff1a; 一、场景操作步骤 操作步…...

QMC5883L的驱动

简介 本篇文章的代码已经上传到了github上面&#xff0c;开源代码 作为一个电子罗盘模块&#xff0c;我们可以通过I2C从中获取偏航角yaw&#xff0c;相对于六轴陀螺仪的yaw&#xff0c;qmc5883l几乎不会零飘并且成本较低。 参考资料 QMC5883L磁场传感器驱动 QMC5883L磁力计…...

MySQL 8.0 OCP 英文题库解析(十三)

Oracle 为庆祝 MySQL 30 周年&#xff0c;截止到 2025.07.31 之前。所有人均可以免费考取原价245美元的MySQL OCP 认证。 从今天开始&#xff0c;将英文题库免费公布出来&#xff0c;并进行解析&#xff0c;帮助大家在一个月之内轻松通过OCP认证。 本期公布试题111~120 试题1…...

大学生职业发展与就业创业指导教学评价

这里是引用 作为软工2203/2204班的学生&#xff0c;我们非常感谢您在《大学生职业发展与就业创业指导》课程中的悉心教导。这门课程对我们即将面临实习和就业的工科学生来说至关重要&#xff0c;而您认真负责的教学态度&#xff0c;让课程的每一部分都充满了实用价值。 尤其让我…...

Java 二维码

Java 二维码 **技术&#xff1a;**谷歌 ZXing 实现 首先添加依赖 <!-- 二维码依赖 --><dependency><groupId>com.google.zxing</groupId><artifactId>core</artifactId><version>3.5.1</version></dependency><de…...

安卓基础(aar)

重新设置java21的环境&#xff0c;临时设置 $env:JAVA_HOME "D:\Android Studio\jbr" 查看当前环境变量 JAVA_HOME 的值 echo $env:JAVA_HOME 构建ARR文件 ./gradlew :private-lib:assembleRelease 目录是这样的&#xff1a; MyApp/ ├── app/ …...

python报错No module named ‘tensorflow.keras‘

是由于不同版本的tensorflow下的keras所在的路径不同&#xff0c;结合所安装的tensorflow的目录结构修改from语句即可。 原语句&#xff1a; from tensorflow.keras.layers import Conv1D, MaxPooling1D, LSTM, Dense 修改后&#xff1a; from tensorflow.python.keras.lay…...

【Java学习笔记】BigInteger 和 BigDecimal 类

BigInteger 和 BigDecimal 类 二者共有的常见方法 方法功能add加subtract减multiply乘divide除 注意点&#xff1a;传参类型必须是类对象 一、BigInteger 1. 作用&#xff1a;适合保存比较大的整型数 2. 使用说明 创建BigInteger对象 传入字符串 3. 代码示例 import j…...

中医有效性探讨

文章目录 西医是如何发展到以生物化学为药理基础的现代医学&#xff1f;传统医学奠基期&#xff08;远古 - 17 世纪&#xff09;近代医学转型期&#xff08;17 世纪 - 19 世纪末&#xff09;​现代医学成熟期&#xff08;20世纪至今&#xff09; 中医的源远流长和一脉相承远古至…...

20个超级好用的 CSS 动画库

分享 20 个最佳 CSS 动画库。 它们中的大多数将生成纯 CSS 代码&#xff0c;而不需要任何外部库。 1.Animate.css 一个开箱即用型的跨浏览器动画库&#xff0c;可供你在项目中使用。 2.Magic Animations CSS3 一组简单的动画&#xff0c;可以包含在你的网页或应用项目中。 3.An…...