当前位置: 首页 > news >正文

SpringKafka生产者、消费者消息拦截

1 前言

在Spring Kafka中,可以通过配置拦截器来实现对生产者和消费者消息的拦截。拦截器可以用来记录日志、修改消息等等。

2 基于Kafka管理的拦截器

Kafka原生提供的拦截器接口是org.apache.kafka.clients.producer.ProducerInterceptor
org.apache.kafka.clients.consumer.ConsumerInterceptor, 示例如下:

2.1 定义拦截器

生产者拦截器

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;public class CustomProducerInterceptor implements ProducerInterceptor<String, String> {@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {// 在发送消息之前操作System.out.println("Sending message: " + record.value());return record; // 继续发送}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}@Overridepublic void close() {// 资源清理}@Overridepublic void configure(Map<String, ?> configs) {// 可以在这里获取配置}
}

2.2 定义消费者拦截器

import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;public class CustomConsumerInterceptor implements ConsumerInterceptor<String, String> {@Overridepublic void configure(Map<String, ?> configs) {// 配置拦截器}@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {// 处理接收到的消息records.forEach(record -> {System.out.println("Consumed message: " + record.value());});return records;}@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {}@Overridepublic void close() {// 资源清理}
}

2.3 添加拦截器

方式一,通过工厂自定义器设置拦截器

import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;@Component
public class KafkaProducerCustomizer implements DefaultKafkaProducerFactoryCustomizer, DefaultKafkaConsumerFactoryCustomizer {@Overridepublic void customize(DefaultKafkaProducerFactory<?, ?> producerFactory) {producerFactory.updateConfigs(Map.of(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomProducerInterceptor.class.getName()));}@Overridepublic void customize(DefaultKafkaConsumerFactory<?, ?> consumerFactory) {consumerFactory.updateConfigs(Map.of(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomConsumerInterceptor.class.getName()));}
}

方式二,通过配置设置拦截器

spring:kafka:producer:properties:interceptor.classes: org.example.kafka.CustomProducerInterceptorconsumer:properties:interceptor.classes: org.example.kafka.CustomConsumerInterceptor

2.4 拦截器使用Spring容器中的Bean

上面的方法可以看到,拦截器由于没有在Spring容器中管理,则无法使用容器中其他Bean来做业务处理,那么可以另外一种策略达到让拦截器受Spring容器管理的需求, 已消息生产者拦截器为例:
Bean定义

@Component
public class MyComponent {public void checkMessage(String message) {System.out.println("Sending message: " + message);}
}

生产者拦截器

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;public class CustomProducerInterceptor implements ProducerInterceptor<String, String> {private MyComponent myComponent;@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {myComponent.checkMessage(record.value());return record; // 继续发送}@Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}@Overridepublic void close() {// 资源清理}@Overridepublic void configure(Map<String, ?> configs) {myComponent = configs.get("myComponent");}
}

设置拦截器

import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;@Component
public class KafkaProducerCustomizer implements DefaultKafkaProducerFactoryCustomizer {@Autowiredprivate MyComponent myComponent;@Overridepublic void customize(DefaultKafkaProducerFactory<?, ?> producerFactory) {producerFactory.updateConfigs(Map.of("myComponent", myComponent,ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomProducerInterceptor.class.getName()));}
}

3 基于Spring-Kafka管理的拦截器

基于Kafka管理的拦截器对于消费消息的拦截只能做到批量消费级别(ConsumerRecords),如果要对单条消息拦截,可以使用Spring-Kafka提供的org.springframework.kafka.listener.RecordInterceptor接口。

3.1 单条消息拦截接口定义

由于此拦截器是受Spring容器管理的,所以可以通过@Component注解自动注入到容器中,进行自动拦截。

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.RecordInterceptor;@Component
public class CustomRecordInterceptor implements RecordInterceptor<Object, Object> {@Overridepublic ConsumerRecord<Object, Object> intercept(ConsumerRecord<Object, Object> record) {System.out.println(record.topic());return record;}
}

相关文章:

SpringKafka生产者、消费者消息拦截

1 前言 在Spring Kafka中&#xff0c;可以通过配置拦截器来实现对生产者和消费者消息的拦截。拦截器可以用来记录日志、修改消息等等。 2 基于Kafka管理的拦截器 Kafka原生提供的拦截器接口是org.apache.kafka.clients.producer.ProducerInterceptor和 org.apache.kafka.cli…...

Qt报错QOCI driver not loaded且QOCI available的解决方法

参考 Linux Qt 6安装Oracle QOCI SQL Driver插件&#xff08;适用WSL&#xff09; 安装 QOCI 插件完成后运行 Qt 项目报错&#xff1a; qt.sql.qsqldatabase: QSqlDatabase: QOCI driver not loaded qt.sql.qsqldatabase: QSqlDatabase: available drivers: QMIMER QPSQL QODBC…...

python mac vscode 脚本文件的运行

切换到脚本文件的目录下 路径的修改 当前文件组织形式&#xff1a; 脚本文件在文件夹下&#xff1a; 赋予权限&#xff1a;chmod x ./scripts/fscd_test.sh 运行&#xff1a;./scripts/fscd_test.sh...

Linux之du命令

华子目录 du命令常用选项示例注意事项 du命令 du&#xff08;Disk Usage&#xff09;命令是用于在类Unix操作系统&#xff08;如Linux和macOS&#xff09;中显示文件和目录所占用的磁盘空间大小的工具。它可以递归地计算目录和文件的磁盘使用情况&#xff0c;并提供详细的报告…...

WRF-LES与PALM微尺度气象大涡模拟

针对微尺度气象的复杂性&#xff0c;大涡模拟&#xff08;LES&#xff09;提供了一种无可比拟的解决方案。微尺度气象学涉及对小范围内的大气过程进行精确模拟&#xff0c;这些过程往往与天气模式、地形影响和人为因素如城市布局紧密相关。在这种规模上&#xff0c;传统的气象模…...

桌面程序开发框架选择

桌面程序开发框架选择 1、WinForm(Windows Form)优点缺点 2、WPF(Windows Presentation Foundation)优点缺点 3、Electron优点缺点 4、Delphi优点缺点 5、QT优点缺点 6、MFC(Microsoft Foundation Class Library)优点缺点 7、JavaFX优点缺点 8、SwingAWT9、Avalonia10、Flutter…...

Vue项目开发:Vuex使用,表单验证配置,ESLint关闭与常见问题解决方案

文章目录 vuexvue配置form表单验证移除vue中表单验证的两种方法关闭vue项目的eslint代码校验做vue项目出现的问题 vuex Vue提供的状态管理工具&#xff0c;用于统一管理我们项目中各种数据的交互和重用&#xff0c;存储我们需要用到的数据对象属性 state&#xff1a;vuex的基本…...

源鲁杯2024赛题复现Web Misc部分WP

MISC [Round 1] whatmusic 拿到题目&#xff0c;一个password和加密的压缩包 查看password的文件尾 这里会发现是png文件的文件头&#xff0c;逆序输出&#xff0c;并保存为1.png。得到一个图片&#xff0c; 进行CRC爆破&#xff0c;发现宽高被修改&#xff0c;之后拿到压缩…...

【企业微信新版sdk】

wecom 的引入使用 一、引入wecom二、封装函数三、使用测试 一、引入wecom 1、企业微信 WECOM-JSSDK提供了 npm 和 cdn 两种引入途径。1.1、 npm 引入 npm install wecom/jssdk1.2、安装后引入 import * as ww from wecom/jssdk通过 script 标签引入 <script src"ht…...

web安全测试渗透案例知识点总结(下)——小白入狱

目录 [TOC](目录)一、更多详细的实际案例教程案例1&#xff1a;文件上传漏洞利用案例2&#xff1a;目录遍历&#xff08;Path Traversal&#xff09;漏洞检测案例3&#xff1a;暴力破解登录密码案例4&#xff1a;命令注入漏洞案例5&#xff1a;身份认证绕过&#xff08;Passwor…...

【专题】数据库的安全性

1. 数据库安全性概述 数据库存在的不安全因素&#xff1a; 非授权用户对数据库的恶意存取和破坏&#xff1b; 数据库中重要或敏感的数据被泄露&#xff1b; 安全环境的脆弱性。 数据库的安全性与计算机系统的安全性&#xff0c;包括计算机硬件、操作系统、网络系统等的安全…...

【含开题报告+文档+源码】基于Java的房屋租赁服务系统设计与实现

开题报告 随着城市化进程的加速和人口流动性的增加&#xff0c;租房需求不断增长。传统的租赁方式往往存在信息不对称、流程不规范等问题&#xff0c;使得租户和房东的租赁体验不佳。而而房屋租赁系统能够提供便捷、高效的租赁服务&#xff0c;满足租户和房东的需求。房屋租赁…...

数据结构模拟题[十]

数据结构试卷&#xff08;十&#xff09; 一、选择题 (24 分) 1&#xff0e;下列程序段的时间复杂度为&#xff08; &#xff09;。 i0 &#xff0c;s0&#xff1b; while (s<n) {ssi &#xff1b;i &#xff1b;} (A) O(n 1/2 ) (B) O(n 1/3 ) (C) O(n) (D) O(n 2 ) …...

Java基于微信小程序的美食推荐系统(附源码,文档)

博主介绍&#xff1a;✌程序猿徐师兄、8年大厂程序员经历。全网粉丝15w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;…...

基于CNN-RNN的影像报告生成

项目源码获取方式见文章末尾&#xff01; 600多个深度学习项目资料&#xff0c;快来加入社群一起学习吧。 《------往期经典推荐------》 项目名称 1.【PaddleNLP的FAQ问答机器人】 2.【卫星图像道路检测DeepLabV3Plus模型】 3.【GAN模型实现二次元头像生成】 4.【CNN模型实现…...

MacOS如何读取磁盘原始的扇区内容,恢复误删除的数据

MacOS 也是把磁盘当成一个文件&#xff0c;也是可以使用 dd来读取&#xff0c;命行令行如下&#xff1a; sudo dd if/dev/disk2 bs512 count1 skip100 ofsector_100.bin 这个就是读取 /dev/disk2这个磁盘每100这个sector, bs表示扇区大小是512. 但是你直接用读&#xff0c;应…...

创客匠人:打造IP陷入迷茫?20位大咖直播如何破局,实现财富增长

就在明天&#xff01; 11月4日-8日&#xff0c;全球创始人IP领袖峰会线上启动会——《2025年知识IP创新增长训练营*20位各界大咖标杆集体赋能》直播活动&#xff0c;即将重磅来袭&#xff01; 20位行业大咖5大篇章主题内容的强大组合&#xff0c;将为你逐一精彩呈现&#xff…...

视觉目标检测标注xml格式文件解析可视化 - python 实现

视觉目标检测任务&#xff0c;通常用 labelimage标注&#xff0c;对应的标注文件为xml。 该示例来源于开源项目&#xff1a;https://gitcode.com/DataBall/DataBall-detections-100s/overview 读取 xml 标注文件&#xff0c;并进行可视化示例如下&#xff1a; #-*-coding:ut…...

clion远程配置docker ros2

CLION与docker中的ROS2环境构建远程连接 设备前提开启SSH服务CLION配置CLION配置CLION IDE远程连接过程实现CLION SSH 远程部署 开启fastlio2debug之旅 设备前提 本地宿主机&#xff1a;UBUNTU 20.04 docker container:ros2_container (内置环境ROS2 humble) 通过之前的tcp连接…...

微信小程序 uniapp 腾讯地图的调用

/* 提前在您的app.json上加上这些代码 "permission": { "scope.userLocation": { "desc": "你的位置信息将用于地图中定位" } …...

第19节 Node.js Express 框架

Express 是一个为Node.js设计的web开发框架&#xff0c;它基于nodejs平台。 Express 简介 Express是一个简洁而灵活的node.js Web应用框架, 提供了一系列强大特性帮助你创建各种Web应用&#xff0c;和丰富的HTTP工具。 使用Express可以快速地搭建一个完整功能的网站。 Expre…...

QMC5883L的驱动

简介 本篇文章的代码已经上传到了github上面&#xff0c;开源代码 作为一个电子罗盘模块&#xff0c;我们可以通过I2C从中获取偏航角yaw&#xff0c;相对于六轴陀螺仪的yaw&#xff0c;qmc5883l几乎不会零飘并且成本较低。 参考资料 QMC5883L磁场传感器驱动 QMC5883L磁力计…...

YSYX学习记录(八)

C语言&#xff0c;练习0&#xff1a; 先创建一个文件夹&#xff0c;我用的是物理机&#xff1a; 安装build-essential 练习1&#xff1a; 我注释掉了 #include <stdio.h> 出现下面错误 在你的文本编辑器中打开ex1文件&#xff0c;随机修改或删除一部分&#xff0c;之后…...

为什么需要建设工程项目管理?工程项目管理有哪些亮点功能?

在建筑行业&#xff0c;项目管理的重要性不言而喻。随着工程规模的扩大、技术复杂度的提升&#xff0c;传统的管理模式已经难以满足现代工程的需求。过去&#xff0c;许多企业依赖手工记录、口头沟通和分散的信息管理&#xff0c;导致效率低下、成本失控、风险频发。例如&#…...

服务器--宝塔命令

一、宝塔面板安装命令 ⚠️ 必须使用 root 用户 或 sudo 权限执行&#xff01; sudo su - 1. CentOS 系统&#xff1a; yum install -y wget && wget -O install.sh http://download.bt.cn/install/install_6.0.sh && sh install.sh2. Ubuntu / Debian 系统…...

AI病理诊断七剑下天山,医疗未来触手可及

一、病理诊断困局&#xff1a;刀尖上的医学艺术 1.1 金标准背后的隐痛 病理诊断被誉为"诊断的诊断"&#xff0c;医生需通过显微镜观察组织切片&#xff0c;在细胞迷宫中捕捉癌变信号。某省病理质控报告显示&#xff0c;基层医院误诊率达12%-15%&#xff0c;专家会诊…...

PAN/FPN

import torch import torch.nn as nn import torch.nn.functional as F import mathclass LowResQueryHighResKVAttention(nn.Module):"""方案 1: 低分辨率特征 (Query) 查询高分辨率特征 (Key, Value).输出分辨率与低分辨率输入相同。"""def __…...

虚拟电厂发展三大趋势:市场化、技术主导、车网互联

市场化&#xff1a;从政策驱动到多元盈利 政策全面赋能 2025年4月&#xff0c;国家发改委、能源局发布《关于加快推进虚拟电厂发展的指导意见》&#xff0c;首次明确虚拟电厂为“独立市场主体”&#xff0c;提出硬性目标&#xff1a;2027年全国调节能力≥2000万千瓦&#xff0…...

scikit-learn机器学习

# 同时添加如下代码, 这样每次环境(kernel)启动的时候只要运行下方代码即可: # Also add the following code, # so that every time the environment (kernel) starts, # just run the following code: import sys sys.path.append(/home/aistudio/external-libraries)机…...

代码规范和架构【立芯理论一】(2025.06.08)

1、代码规范的目标 代码简洁精炼、美观&#xff0c;可持续性好高效率高复用&#xff0c;可移植性好高内聚&#xff0c;低耦合没有冗余规范性&#xff0c;代码有规可循&#xff0c;可以看出自己当时的思考过程特殊排版&#xff0c;特殊语法&#xff0c;特殊指令&#xff0c;必须…...