SpringKafka生产者、消费者消息拦截
1 前言
在Spring Kafka中,可以通过配置拦截器来实现对生产者和消费者消息的拦截。拦截器可以用来记录日志、修改消息等等。
2 基于Kafka管理的拦截器
Kafka原生提供的拦截器接口是org.apache.kafka.clients.producer.ProducerInterceptor和
org.apache.kafka.clients.consumer.ConsumerInterceptor, 示例如下:
2.1 定义拦截器
生产者拦截器
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;public class CustomProducerInterceptor implements ProducerInterceptor<String, String> {@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {// 在发送消息之前操作System.out.println("Sending message: " + record.value());return record; // 继续发送}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}@Overridepublic void close() {// 资源清理}@Overridepublic void configure(Map<String, ?> configs) {// 可以在这里获取配置}
}
2.2 定义消费者拦截器
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;public class CustomConsumerInterceptor implements ConsumerInterceptor<String, String> {@Overridepublic void configure(Map<String, ?> configs) {// 配置拦截器}@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {// 处理接收到的消息records.forEach(record -> {System.out.println("Consumed message: " + record.value());});return records;}@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {}@Overridepublic void close() {// 资源清理}
}
2.3 添加拦截器
方式一,通过工厂自定义器设置拦截器
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;@Component
public class KafkaProducerCustomizer implements DefaultKafkaProducerFactoryCustomizer, DefaultKafkaConsumerFactoryCustomizer {@Overridepublic void customize(DefaultKafkaProducerFactory<?, ?> producerFactory) {producerFactory.updateConfigs(Map.of(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomProducerInterceptor.class.getName()));}@Overridepublic void customize(DefaultKafkaConsumerFactory<?, ?> consumerFactory) {consumerFactory.updateConfigs(Map.of(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomConsumerInterceptor.class.getName()));}
}
方式二,通过配置设置拦截器
spring:kafka:producer:properties:interceptor.classes: org.example.kafka.CustomProducerInterceptorconsumer:properties:interceptor.classes: org.example.kafka.CustomConsumerInterceptor
2.4 拦截器使用Spring容器中的Bean
上面的方法可以看到,拦截器由于没有在Spring容器中管理,则无法使用容器中其他Bean来做业务处理,那么可以另外一种策略达到让拦截器受Spring容器管理的需求, 已消息生产者拦截器为例:
Bean定义
@Component
public class MyComponent {public void checkMessage(String message) {System.out.println("Sending message: " + message);}
}
生产者拦截器
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;public class CustomProducerInterceptor implements ProducerInterceptor<String, String> {private MyComponent myComponent;@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {myComponent.checkMessage(record.value());return record; // 继续发送}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}@Overridepublic void close() {// 资源清理}@Overridepublic void configure(Map<String, ?> configs) {myComponent = configs.get("myComponent");}
}
设置拦截器
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;@Component
public class KafkaProducerCustomizer implements DefaultKafkaProducerFactoryCustomizer {@Autowiredprivate MyComponent myComponent;@Overridepublic void customize(DefaultKafkaProducerFactory<?, ?> producerFactory) {producerFactory.updateConfigs(Map.of("myComponent", myComponent,ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomProducerInterceptor.class.getName()));}
}
3 基于Spring-Kafka管理的拦截器
基于Kafka管理的拦截器对于消费消息的拦截只能做到批量消费级别(ConsumerRecords),如果要对单条消息拦截,可以使用Spring-Kafka提供的org.springframework.kafka.listener.RecordInterceptor接口。
3.1 单条消息拦截接口定义
由于此拦截器是受Spring容器管理的,所以可以通过@Component注解自动注入到容器中,进行自动拦截。
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.RecordInterceptor;@Component
public class CustomRecordInterceptor implements RecordInterceptor<Object, Object> {@Overridepublic ConsumerRecord<Object, Object> intercept(ConsumerRecord<Object, Object> record) {System.out.println(record.topic());return record;}
}
相关文章:
SpringKafka生产者、消费者消息拦截
1 前言 在Spring Kafka中,可以通过配置拦截器来实现对生产者和消费者消息的拦截。拦截器可以用来记录日志、修改消息等等。 2 基于Kafka管理的拦截器 Kafka原生提供的拦截器接口是org.apache.kafka.clients.producer.ProducerInterceptor和 org.apache.kafka.cli…...
Qt报错QOCI driver not loaded且QOCI available的解决方法
参考 Linux Qt 6安装Oracle QOCI SQL Driver插件(适用WSL) 安装 QOCI 插件完成后运行 Qt 项目报错: qt.sql.qsqldatabase: QSqlDatabase: QOCI driver not loaded qt.sql.qsqldatabase: QSqlDatabase: available drivers: QMIMER QPSQL QODBC…...
python mac vscode 脚本文件的运行
切换到脚本文件的目录下 路径的修改 当前文件组织形式: 脚本文件在文件夹下: 赋予权限:chmod x ./scripts/fscd_test.sh 运行:./scripts/fscd_test.sh...
Linux之du命令
华子目录 du命令常用选项示例注意事项 du命令 du(Disk Usage)命令是用于在类Unix操作系统(如Linux和macOS)中显示文件和目录所占用的磁盘空间大小的工具。它可以递归地计算目录和文件的磁盘使用情况,并提供详细的报告…...
WRF-LES与PALM微尺度气象大涡模拟
针对微尺度气象的复杂性,大涡模拟(LES)提供了一种无可比拟的解决方案。微尺度气象学涉及对小范围内的大气过程进行精确模拟,这些过程往往与天气模式、地形影响和人为因素如城市布局紧密相关。在这种规模上,传统的气象模…...
桌面程序开发框架选择
桌面程序开发框架选择 1、WinForm(Windows Form)优点缺点 2、WPF(Windows Presentation Foundation)优点缺点 3、Electron优点缺点 4、Delphi优点缺点 5、QT优点缺点 6、MFC(Microsoft Foundation Class Library)优点缺点 7、JavaFX优点缺点 8、SwingAWT9、Avalonia10、Flutter…...
Vue项目开发:Vuex使用,表单验证配置,ESLint关闭与常见问题解决方案
文章目录 vuexvue配置form表单验证移除vue中表单验证的两种方法关闭vue项目的eslint代码校验做vue项目出现的问题 vuex Vue提供的状态管理工具,用于统一管理我们项目中各种数据的交互和重用,存储我们需要用到的数据对象属性 state:vuex的基本…...
源鲁杯2024赛题复现Web Misc部分WP
MISC [Round 1] whatmusic 拿到题目,一个password和加密的压缩包 查看password的文件尾 这里会发现是png文件的文件头,逆序输出,并保存为1.png。得到一个图片, 进行CRC爆破,发现宽高被修改,之后拿到压缩…...
【企业微信新版sdk】
wecom 的引入使用 一、引入wecom二、封装函数三、使用测试 一、引入wecom 1、企业微信 WECOM-JSSDK提供了 npm 和 cdn 两种引入途径。1.1、 npm 引入 npm install wecom/jssdk1.2、安装后引入 import * as ww from wecom/jssdk通过 script 标签引入 <script src"ht…...
web安全测试渗透案例知识点总结(下)——小白入狱
目录 [TOC](目录)一、更多详细的实际案例教程案例1:文件上传漏洞利用案例2:目录遍历(Path Traversal)漏洞检测案例3:暴力破解登录密码案例4:命令注入漏洞案例5:身份认证绕过(Passwor…...
【专题】数据库的安全性
1. 数据库安全性概述 数据库存在的不安全因素: 非授权用户对数据库的恶意存取和破坏; 数据库中重要或敏感的数据被泄露; 安全环境的脆弱性。 数据库的安全性与计算机系统的安全性,包括计算机硬件、操作系统、网络系统等的安全…...
【含开题报告+文档+源码】基于Java的房屋租赁服务系统设计与实现
开题报告 随着城市化进程的加速和人口流动性的增加,租房需求不断增长。传统的租赁方式往往存在信息不对称、流程不规范等问题,使得租户和房东的租赁体验不佳。而而房屋租赁系统能够提供便捷、高效的租赁服务,满足租户和房东的需求。房屋租赁…...
数据结构模拟题[十]
数据结构试卷(十) 一、选择题 (24 分) 1.下列程序段的时间复杂度为( )。 i0 ,s0; while (s<n) {ssi ;i ;} (A) O(n 1/2 ) (B) O(n 1/3 ) (C) O(n) (D) O(n 2 ) …...
Java基于微信小程序的美食推荐系统(附源码,文档)
博主介绍:✌程序猿徐师兄、8年大厂程序员经历。全网粉丝15w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 🍅文末获取源码联系🍅 👇🏻 精彩专栏推荐订阅👇…...
基于CNN-RNN的影像报告生成
项目源码获取方式见文章末尾! 600多个深度学习项目资料,快来加入社群一起学习吧。 《------往期经典推荐------》 项目名称 1.【PaddleNLP的FAQ问答机器人】 2.【卫星图像道路检测DeepLabV3Plus模型】 3.【GAN模型实现二次元头像生成】 4.【CNN模型实现…...
MacOS如何读取磁盘原始的扇区内容,恢复误删除的数据
MacOS 也是把磁盘当成一个文件,也是可以使用 dd来读取,命行令行如下: sudo dd if/dev/disk2 bs512 count1 skip100 ofsector_100.bin 这个就是读取 /dev/disk2这个磁盘每100这个sector, bs表示扇区大小是512. 但是你直接用读,应…...
创客匠人:打造IP陷入迷茫?20位大咖直播如何破局,实现财富增长
就在明天! 11月4日-8日,全球创始人IP领袖峰会线上启动会——《2025年知识IP创新增长训练营*20位各界大咖标杆集体赋能》直播活动,即将重磅来袭! 20位行业大咖5大篇章主题内容的强大组合,将为你逐一精彩呈现ÿ…...
视觉目标检测标注xml格式文件解析可视化 - python 实现
视觉目标检测任务,通常用 labelimage标注,对应的标注文件为xml。 该示例来源于开源项目:https://gitcode.com/DataBall/DataBall-detections-100s/overview 读取 xml 标注文件,并进行可视化示例如下: #-*-coding:ut…...
clion远程配置docker ros2
CLION与docker中的ROS2环境构建远程连接 设备前提开启SSH服务CLION配置CLION配置CLION IDE远程连接过程实现CLION SSH 远程部署 开启fastlio2debug之旅 设备前提 本地宿主机:UBUNTU 20.04 docker container:ros2_container (内置环境ROS2 humble) 通过之前的tcp连接…...
微信小程序 uniapp 腾讯地图的调用
/* 提前在您的app.json上加上这些代码 "permission": { "scope.userLocation": { "desc": "你的位置信息将用于地图中定位" } …...
Win10 22H2多合一版本实测:家庭版/专业版/企业版到底有什么区别?
Win10 22H2多合一版本深度解析:如何根据需求选择最佳系统版本 当你面对一个包含家庭版、专业版、企业版等多个版本的Win10 22H2多合一ISO镜像时,是否曾感到困惑:这些版本之间究竟有什么区别?哪个版本最适合我的使用场景࿱…...
SmolVLA部署案例:树莓派5+USB GPU加速器运行SmolVLA轻量版可行性探索
SmolVLA部署案例:树莓派5USB GPU加速器运行SmolVLA轻量版可行性探索 1. 引言 你有没有想过,让一个巴掌大的树莓派也能跑起来一个能“看懂”世界、听懂指令、并控制机器人动作的AI模型?这听起来像是科幻电影里的场景,但今天我们要…...
别再死记硬背TTS原理了!用Python+TensorFlow复现一个简易Deep Voice,从音素到语音全流程拆解
用PythonTensorFlow实战Deep Voice:从音素到语音的完整实现指南 当你第一次听到计算机生成的语音时,是否好奇过这背后的魔法是如何实现的?现代文本转语音(TTS)系统已经能够产生几乎与真人无异的语音,而Deep Voice作为早期端到端TT…...
避开这些坑!用MATLAB做QPSK调制解调仿真时,你的成形滤波和匹配滤波设置对了吗?
QPSK仿真中的成形滤波与匹配滤波陷阱:MATLAB实战避坑指南 在数字通信系统的设计与验证过程中,MATLAB仿真扮演着至关重要的角色。许多工程师和研究人员在QPSK调制解调仿真中,常常遇到性能不达预期或结果与理论不符的情况。本文将深入剖析成形滤…...
LeetCode 102. Binary Tree Level Order Traversal 题解
LeetCode 102. Binary Tree Level Order Traversal 题解 题目描述 给你二叉树的根节点 root,返回其节点值的 层序遍历。 (即逐层地,从左到右访问所有节点)。 示例 1: 输入:root [3,9,20,null,null,15,7] 输…...
吃透Redis核心数据结构:从原理到实战,避开90%的坑
Redis之所以能成为分布式系统的“性能神器”,核心在于其高效的内存数据结构设计。很多开发者对Redis的认知停留在“SET/GET缓存”,只会用最基础的字符串类型,却忽略了List、Hash、Set、ZSet等核心结构的强大能力,导致代码冗余、性…...
GTE-Pro物流应用:运单文本的智能处理
GTE-Pro物流应用:运单文本的智能处理 1. 物流行业的文本处理挑战 每天,物流公司都要处理海量的运单文本和客服对话。这些文本数据里藏着宝贵的信息,但传统的关键词匹配方法往往力不从心。 想象一下这样的场景:一个运单上写着&q…...
半方差函数四大参数保姆级解读:从块金值到变程的空间自相关分析
半方差函数四大参数保姆级解读:从块金值到变程的空间自相关分析 刚接触地理统计时,看到"半方差函数"这个术语总让人望而生畏。但当我第一次用气象站数据绘制出那条神奇的曲线时,突然理解了空间数据背后隐藏的对话——就像侦探通过蛛…...
Qwen3-ASR-1.7B多说话人分离展示:会议录音自动分角色
Qwen3-ASR-1.7B多说话人分离展示:会议录音自动分角色 会议记录不再需要人工分辨谁说了什么,AI现在能帮你自动区分每个发言人 1. 引言 想象一下这样的场景:一场两小时的多人会议刚刚结束,你需要整理会议纪要。传统的做法是反复听录…...
工业质检新革命:无需标注数据,用ChatGPT式对话完成目标定位
工业质检新革命:无需标注数据,用ChatGPT式对话完成目标定位 1. 传统工业质检的痛点与挑战 在制造业的质检环节中,目标定位一直是个技术难题。传统方法通常需要: 大量标注数据训练专用模型针对每种产品定制算法频繁调整参数适应…...
