SpringKafka生产者、消费者消息拦截
1 前言
在Spring Kafka中,可以通过配置拦截器来实现对生产者和消费者消息的拦截。拦截器可以用来记录日志、修改消息等等。
2 基于Kafka管理的拦截器
Kafka原生提供的拦截器接口是org.apache.kafka.clients.producer.ProducerInterceptor和
org.apache.kafka.clients.consumer.ConsumerInterceptor, 示例如下:
2.1 定义拦截器
生产者拦截器
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;public class CustomProducerInterceptor implements ProducerInterceptor<String, String> {@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {// 在发送消息之前操作System.out.println("Sending message: " + record.value());return record; // 继续发送}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}@Overridepublic void close() {// 资源清理}@Overridepublic void configure(Map<String, ?> configs) {// 可以在这里获取配置}
}
2.2 定义消费者拦截器
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;public class CustomConsumerInterceptor implements ConsumerInterceptor<String, String> {@Overridepublic void configure(Map<String, ?> configs) {// 配置拦截器}@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {// 处理接收到的消息records.forEach(record -> {System.out.println("Consumed message: " + record.value());});return records;}@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {}@Overridepublic void close() {// 资源清理}
}
2.3 添加拦截器
方式一,通过工厂自定义器设置拦截器
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;@Component
public class KafkaProducerCustomizer implements DefaultKafkaProducerFactoryCustomizer, DefaultKafkaConsumerFactoryCustomizer {@Overridepublic void customize(DefaultKafkaProducerFactory<?, ?> producerFactory) {producerFactory.updateConfigs(Map.of(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomProducerInterceptor.class.getName()));}@Overridepublic void customize(DefaultKafkaConsumerFactory<?, ?> consumerFactory) {consumerFactory.updateConfigs(Map.of(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomConsumerInterceptor.class.getName()));}
}
方式二,通过配置设置拦截器
spring:kafka:producer:properties:interceptor.classes: org.example.kafka.CustomProducerInterceptorconsumer:properties:interceptor.classes: org.example.kafka.CustomConsumerInterceptor
2.4 拦截器使用Spring容器中的Bean
上面的方法可以看到,拦截器由于没有在Spring容器中管理,则无法使用容器中其他Bean来做业务处理,那么可以另外一种策略达到让拦截器受Spring容器管理的需求, 已消息生产者拦截器为例:
Bean定义
@Component
public class MyComponent {public void checkMessage(String message) {System.out.println("Sending message: " + message);}
}
生产者拦截器
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;public class CustomProducerInterceptor implements ProducerInterceptor<String, String> {private MyComponent myComponent;@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {myComponent.checkMessage(record.value());return record; // 继续发送}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}@Overridepublic void close() {// 资源清理}@Overridepublic void configure(Map<String, ?> configs) {myComponent = configs.get("myComponent");}
}
设置拦截器
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;@Component
public class KafkaProducerCustomizer implements DefaultKafkaProducerFactoryCustomizer {@Autowiredprivate MyComponent myComponent;@Overridepublic void customize(DefaultKafkaProducerFactory<?, ?> producerFactory) {producerFactory.updateConfigs(Map.of("myComponent", myComponent,ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomProducerInterceptor.class.getName()));}
}
3 基于Spring-Kafka管理的拦截器
基于Kafka管理的拦截器对于消费消息的拦截只能做到批量消费级别(ConsumerRecords),如果要对单条消息拦截,可以使用Spring-Kafka提供的org.springframework.kafka.listener.RecordInterceptor接口。
3.1 单条消息拦截接口定义
由于此拦截器是受Spring容器管理的,所以可以通过@Component注解自动注入到容器中,进行自动拦截。
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.RecordInterceptor;@Component
public class CustomRecordInterceptor implements RecordInterceptor<Object, Object> {@Overridepublic ConsumerRecord<Object, Object> intercept(ConsumerRecord<Object, Object> record) {System.out.println(record.topic());return record;}
}
相关文章:
SpringKafka生产者、消费者消息拦截
1 前言 在Spring Kafka中,可以通过配置拦截器来实现对生产者和消费者消息的拦截。拦截器可以用来记录日志、修改消息等等。 2 基于Kafka管理的拦截器 Kafka原生提供的拦截器接口是org.apache.kafka.clients.producer.ProducerInterceptor和 org.apache.kafka.cli…...
Qt报错QOCI driver not loaded且QOCI available的解决方法
参考 Linux Qt 6安装Oracle QOCI SQL Driver插件(适用WSL) 安装 QOCI 插件完成后运行 Qt 项目报错: qt.sql.qsqldatabase: QSqlDatabase: QOCI driver not loaded qt.sql.qsqldatabase: QSqlDatabase: available drivers: QMIMER QPSQL QODBC…...
python mac vscode 脚本文件的运行
切换到脚本文件的目录下 路径的修改 当前文件组织形式: 脚本文件在文件夹下: 赋予权限:chmod x ./scripts/fscd_test.sh 运行:./scripts/fscd_test.sh...
Linux之du命令
华子目录 du命令常用选项示例注意事项 du命令 du(Disk Usage)命令是用于在类Unix操作系统(如Linux和macOS)中显示文件和目录所占用的磁盘空间大小的工具。它可以递归地计算目录和文件的磁盘使用情况,并提供详细的报告…...
WRF-LES与PALM微尺度气象大涡模拟
针对微尺度气象的复杂性,大涡模拟(LES)提供了一种无可比拟的解决方案。微尺度气象学涉及对小范围内的大气过程进行精确模拟,这些过程往往与天气模式、地形影响和人为因素如城市布局紧密相关。在这种规模上,传统的气象模…...
桌面程序开发框架选择
桌面程序开发框架选择 1、WinForm(Windows Form)优点缺点 2、WPF(Windows Presentation Foundation)优点缺点 3、Electron优点缺点 4、Delphi优点缺点 5、QT优点缺点 6、MFC(Microsoft Foundation Class Library)优点缺点 7、JavaFX优点缺点 8、SwingAWT9、Avalonia10、Flutter…...
Vue项目开发:Vuex使用,表单验证配置,ESLint关闭与常见问题解决方案
文章目录 vuexvue配置form表单验证移除vue中表单验证的两种方法关闭vue项目的eslint代码校验做vue项目出现的问题 vuex Vue提供的状态管理工具,用于统一管理我们项目中各种数据的交互和重用,存储我们需要用到的数据对象属性 state:vuex的基本…...
源鲁杯2024赛题复现Web Misc部分WP
MISC [Round 1] whatmusic 拿到题目,一个password和加密的压缩包 查看password的文件尾 这里会发现是png文件的文件头,逆序输出,并保存为1.png。得到一个图片, 进行CRC爆破,发现宽高被修改,之后拿到压缩…...
【企业微信新版sdk】
wecom 的引入使用 一、引入wecom二、封装函数三、使用测试 一、引入wecom 1、企业微信 WECOM-JSSDK提供了 npm 和 cdn 两种引入途径。1.1、 npm 引入 npm install wecom/jssdk1.2、安装后引入 import * as ww from wecom/jssdk通过 script 标签引入 <script src"ht…...
web安全测试渗透案例知识点总结(下)——小白入狱
目录 [TOC](目录)一、更多详细的实际案例教程案例1:文件上传漏洞利用案例2:目录遍历(Path Traversal)漏洞检测案例3:暴力破解登录密码案例4:命令注入漏洞案例5:身份认证绕过(Passwor…...
【专题】数据库的安全性
1. 数据库安全性概述 数据库存在的不安全因素: 非授权用户对数据库的恶意存取和破坏; 数据库中重要或敏感的数据被泄露; 安全环境的脆弱性。 数据库的安全性与计算机系统的安全性,包括计算机硬件、操作系统、网络系统等的安全…...
【含开题报告+文档+源码】基于Java的房屋租赁服务系统设计与实现
开题报告 随着城市化进程的加速和人口流动性的增加,租房需求不断增长。传统的租赁方式往往存在信息不对称、流程不规范等问题,使得租户和房东的租赁体验不佳。而而房屋租赁系统能够提供便捷、高效的租赁服务,满足租户和房东的需求。房屋租赁…...
数据结构模拟题[十]
数据结构试卷(十) 一、选择题 (24 分) 1.下列程序段的时间复杂度为( )。 i0 ,s0; while (s<n) {ssi ;i ;} (A) O(n 1/2 ) (B) O(n 1/3 ) (C) O(n) (D) O(n 2 ) …...
Java基于微信小程序的美食推荐系统(附源码,文档)
博主介绍:✌程序猿徐师兄、8年大厂程序员经历。全网粉丝15w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 🍅文末获取源码联系🍅 👇🏻 精彩专栏推荐订阅👇…...
基于CNN-RNN的影像报告生成
项目源码获取方式见文章末尾! 600多个深度学习项目资料,快来加入社群一起学习吧。 《------往期经典推荐------》 项目名称 1.【PaddleNLP的FAQ问答机器人】 2.【卫星图像道路检测DeepLabV3Plus模型】 3.【GAN模型实现二次元头像生成】 4.【CNN模型实现…...
MacOS如何读取磁盘原始的扇区内容,恢复误删除的数据
MacOS 也是把磁盘当成一个文件,也是可以使用 dd来读取,命行令行如下: sudo dd if/dev/disk2 bs512 count1 skip100 ofsector_100.bin 这个就是读取 /dev/disk2这个磁盘每100这个sector, bs表示扇区大小是512. 但是你直接用读,应…...
创客匠人:打造IP陷入迷茫?20位大咖直播如何破局,实现财富增长
就在明天! 11月4日-8日,全球创始人IP领袖峰会线上启动会——《2025年知识IP创新增长训练营*20位各界大咖标杆集体赋能》直播活动,即将重磅来袭! 20位行业大咖5大篇章主题内容的强大组合,将为你逐一精彩呈现ÿ…...
视觉目标检测标注xml格式文件解析可视化 - python 实现
视觉目标检测任务,通常用 labelimage标注,对应的标注文件为xml。 该示例来源于开源项目:https://gitcode.com/DataBall/DataBall-detections-100s/overview 读取 xml 标注文件,并进行可视化示例如下: #-*-coding:ut…...
clion远程配置docker ros2
CLION与docker中的ROS2环境构建远程连接 设备前提开启SSH服务CLION配置CLION配置CLION IDE远程连接过程实现CLION SSH 远程部署 开启fastlio2debug之旅 设备前提 本地宿主机:UBUNTU 20.04 docker container:ros2_container (内置环境ROS2 humble) 通过之前的tcp连接…...
微信小程序 uniapp 腾讯地图的调用
/* 提前在您的app.json上加上这些代码 "permission": { "scope.userLocation": { "desc": "你的位置信息将用于地图中定位" } …...
[2025CVPR]DeepVideo-R1:基于难度感知回归GRPO的视频强化微调框架详解
突破视频大语言模型推理瓶颈,在多个视频基准上实现SOTA性能 一、核心问题与创新亮点 1.1 GRPO在视频任务中的两大挑战 安全措施依赖问题 GRPO使用min和clip函数限制策略更新幅度,导致: 梯度抑制:当新旧策略差异过大时梯度消失收敛困难:策略无法充分优化# 传统GRPO的梯…...
Linux 文件类型,目录与路径,文件与目录管理
文件类型 后面的字符表示文件类型标志 普通文件:-(纯文本文件,二进制文件,数据格式文件) 如文本文件、图片、程序文件等。 目录文件:d(directory) 用来存放其他文件或子目录。 设备…...
智慧医疗能源事业线深度画像分析(上)
引言 医疗行业作为现代社会的关键基础设施,其能源消耗与环境影响正日益受到关注。随着全球"双碳"目标的推进和可持续发展理念的深入,智慧医疗能源事业线应运而生,致力于通过创新技术与管理方案,重构医疗领域的能源使用模式。这一事业线融合了能源管理、可持续发…...
Java 语言特性(面试系列1)
一、面向对象编程 1. 封装(Encapsulation) 定义:将数据(属性)和操作数据的方法绑定在一起,通过访问控制符(private、protected、public)隐藏内部实现细节。示例: public …...
基于ASP.NET+ SQL Server实现(Web)医院信息管理系统
医院信息管理系统 1. 课程设计内容 在 visual studio 2017 平台上,开发一个“医院信息管理系统”Web 程序。 2. 课程设计目的 综合运用 c#.net 知识,在 vs 2017 平台上,进行 ASP.NET 应用程序和简易网站的开发;初步熟悉开发一…...
基于uniapp+WebSocket实现聊天对话、消息监听、消息推送、聊天室等功能,多端兼容
基于 UniApp + WebSocket实现多端兼容的实时通讯系统,涵盖WebSocket连接建立、消息收发机制、多端兼容性配置、消息实时监听等功能,适配微信小程序、H5、Android、iOS等终端 目录 技术选型分析WebSocket协议优势UniApp跨平台特性WebSocket 基础实现连接管理消息收发连接…...
在 Nginx Stream 层“改写”MQTT ngx_stream_mqtt_filter_module
1、为什么要修改 CONNECT 报文? 多租户隔离:自动为接入设备追加租户前缀,后端按 ClientID 拆分队列。零代码鉴权:将入站用户名替换为 OAuth Access-Token,后端 Broker 统一校验。灰度发布:根据 IP/地理位写…...
C++ 基础特性深度解析
目录 引言 一、命名空间(namespace) C 中的命名空间 与 C 语言的对比 二、缺省参数 C 中的缺省参数 与 C 语言的对比 三、引用(reference) C 中的引用 与 C 语言的对比 四、inline(内联函数…...
从零开始打造 OpenSTLinux 6.6 Yocto 系统(基于STM32CubeMX)(九)
设备树移植 和uboot设备树修改的内容同步到kernel将设备树stm32mp157d-stm32mp157daa1-mx.dts复制到内核源码目录下 源码修改及编译 修改arch/arm/boot/dts/st/Makefile,新增设备树编译 stm32mp157f-ev1-m4-examples.dtb \stm32mp157d-stm32mp157daa1-mx.dtb修改…...
现代密码学 | 椭圆曲线密码学—附py代码
Elliptic Curve Cryptography 椭圆曲线密码学(ECC)是一种基于有限域上椭圆曲线数学特性的公钥加密技术。其核心原理涉及椭圆曲线的代数性质、离散对数问题以及有限域上的运算。 椭圆曲线密码学是多种数字签名算法的基础,例如椭圆曲线数字签…...
