kafka Interceptors and Listeners
Interceptors
ProducerInterceptor
https://www.cnblogs.com/huxi2b/p/7072447.html
Producer拦截器(interceptor)是个相当新的功能,它和consumer端interceptor是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。
对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain),按照指定顺序调用它们.
API
public interface ProducerInterceptor<K, V> extends Configurable, AutoCloseable {//该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中的。Producer确保在消息被序列化以计算分区**前**调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);//该方法会在消息被应答之前或消息发送失败时调用,并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率void onAcknowledgement(RecordMetadata metadata, Exception exception);//关闭interceptor,主要用于执行一些资源清理工作void close();
}
demo
public static void main(String[] args) throws ExecutionException, InterruptedException {Map<String, Object> props = new HashMap();props.put("bootstrap.servers", "localhost:9092");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, RawSerializer.class);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, RawSerializer.class);List<String> interceptors = new ArrayList<>();interceptors.add("cn.jhs.kakfa.p.interceptor.TimeStampInterceptor"); // interceptor 1interceptors.add("cn.jhs.kakfa.p.interceptor.CounterInterceptor"); // interceptor 2props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);String topic = "test-topic";Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>(topic, "message:" + i);producer.send(record).get();}// 一定要关闭producer,这样才会调用interceptor的close方法producer.close();}
}
ConsumerInterceptor
https://blog.csdn.net/warybee/article/details/121980296
消费者(Consumer)在拉取了分区消息之后,要首先经过反序列化器对key和value进行反序列化处理,处理完之后,如果消费端设置了拦截器,则需要经过拦截器的处理之后,才能返回给消费者应用程 序进行处理。
- ConsumerInterceptor允许拦截甚至更改消费者接收到的消息。
- 常用在于将第三方组件引入 消费者应用程序,用于定制的监控、日志处理等。
- ConsumerInterceptor方法抛出的异常会被捕获、记录,但是不会向下传播。如果用户配置了 错误的key或value类型参数,消费者不会抛出异常,而仅仅是记录下来。
- 如果有多个拦截器,则该方法按照KafkaConsumer的configs中配置的顺序调用。
- 从调用 KafkaConsumer.poll(long) 的同一线程调用 ConsumerInterceptor 回调。
API
public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {/**该方法在poll方法返回之前调用。调用结束后poll方法就返回消息了。该方法可以修改消费者消息,返回新的消息。拦截器可以过滤收到的消息或生成新的消息。*/ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);/**当消费者提交偏移量时,调用该方法。通常你可以在该方法中做一些记账类的动作,比如打日志等。调用者将忽略此方法抛出的任何异常。*/void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);/*** 关闭Interceptor之前调用*/void close();
}
配置
//如果有多个拦截器,用,分割即可
configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.warybee.interceptor.MyConsumerInterceptor");
Listeners
ProducerListener
https://blog.csdn.net/u014494148/article/details/125344184
Kafka提供了生产者监听器 ProducerListener,他的作用类似于带回调的KafkaTemplate#send(callback) ; 可以监听到消息发送成功或者失败。ProducerListener 提供了onSuccess 成功回调,和 onError 失败回调,如下:
API
public interface ProducerListener<K, V> {/*** Invoked after the successful send of a message (that is, after it has been acknowledged by the broker).*/default void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {}/*** Invoked after an attempt to send a message has failed.*/default void onError(ProducerRecord<K, V> producerRecord, @Nullable RecordMetadata recordMetadata,Exception exception) {}}
自定义Listener
public class MyProducerListener<K, V> implements ProducerListener<K, V> {private FallbackHandler<K, V> fallbackHandler;@Overridepublic void onError(ProducerRecord<K, V> producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) {//fallbackHandler.process.//write error metrics...}@Overridepublic void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {//write success metrics...}
}
demo(KafkaTemplate.setProducerListener())
public KafkaTemplate<Object, Object> buildKafkaTemplate(Map<String, Object> props) {ProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(props);KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(factory);MyProducerListener<Object, Object> listener1 = new MyProducerListener<>();listener1.setFallbackHandler(fallbackHandler);kafkaTemplate.setProducerListener(listener1);return kafkaTemplate;}
KafkaListenerErrorHandler
当@KafkaListener方法抛出异常时调用的错误处理程序.
API
@FunctionalInterface
public interface KafkaListenerErrorHandler {/*** Handle the error.*/Object handleError(Message<?> message, ListenerExecutionFailedException exception);
}
自定义CustomKafkaListenerErrorHandler(当异常过多时,暂停消费)
/*** 可以通过:* @org.springframework.kafka.annotation.KafkaListener(errorHandler="customKafkaListenerErrorHandler")* 来引入该配置*/
@Component
public class CustomKafkaListenerErrorHandler implements KafkaListenerErrorHandler {//记录了所有的 kafka MessageListenerContainerprivate final KafkaListenerEndpointRegistry endpointRegistry;public CustomKafkaListenerErrorHandler(KafkaListenerEndpointRegistry endpointRegistry) {this.endpointRegistry = endpointRegistry;}@Overridepublic Object handleError(Message<?> message, ListenerExecutionFailedException exception) {// 处理异常// 暂停消费者String listenerId = exception.getGroupId();MessageListenerContainer listenerContainer = endpointRegistry.getListenerContainer(listenerId);listenerContainer.pause();//滑动窗口算法 ---// 休眠一段时间(例如 30秒)try {Thread.sleep(30000); // 暂停 30 秒} catch (InterruptedException e) {Thread.currentThread().interrupt();}// 恢复消费者listenerContainer.resume();return null;}
}
demo
@org.springframework.kafka.annotation.KafkaListener(errorHandler="customKafkaListenerErrorHandler")
Callback
producer.Callback
public interface Callback {//processed befeore listener...void onCompletion(RecordMetadata metadata, Exception exception);
}
demo
producer.send(producerRecord, (recordMetadata, exception) -> {if (exception == null) {System.out.println("Record written to offset " +recordMetadata.offset() + " timestamp " +recordMetadata.timestamp());} else {System.err.println("An error occurred");exception.printStackTrace(System.err);}
});
相关文章:
kafka Interceptors and Listeners
Interceptors ProducerInterceptor https://www.cnblogs.com/huxi2b/p/7072447.html Producer拦截器(interceptor)是个相当新的功能,它和consumer端interceptor是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。 对于producer而言&…...
【面试题】mysql常见面试题及答案总结
事务中的ACID原则是什么? Mysql是如何实现或者保障ACID的? ACID原则是数据库事务管理中必须满足的四个基本属性,确保了数据库事务的可靠性和数据完整性。 简写全称解释实现A原子性(Atomicity)一个事务被视为一个不可分割的操作序列&#…...
C++ 类的前向声明的用法
我们知道C的类应当是先定义,然后使用。但在处理相对复杂的问题、考虑类的组合时,很可能遇到俩个类相互引用的情况,这种情况称为循环依赖。 例如: class A { public:void f(B b);//以B类对象b为形参的成员函数//这里编译错位&…...
二分查找(c语言)
二分查找 一.什么是二分查找二.代码实现 一.什么是二分查找 在⼀个升序的数组中查找制定的数字n,很容易想到的⽅法就是遍历数组,但是这种⽅法效率⽐较低, ⽐如我买了⼀双鞋,你好奇问我多少钱,我说不超过300元。你还是好…...
【记录31】elementUI el-tree 虚线、右键、拖拽
父组件 <eltree :treeData"treeData"></eltree>import eltree from "../../components/tree.vue"; export default {name: ,components: { // org_tree ,eltree},watch: {},data() {return {orgFormchoose: {},orgForm: { type: 0, limits: 1…...
【C++】函数重载
🦄个人主页:修修修也 🎏所属专栏:C ⚙️操作环境:Visual Studio 2022 目录 📌函数重载的定义 📌函数重载的三种类型 🎏参数个数不同 🎏参数类型不同 🎏参数类型顺序不同 📌重载…...
【深度学习模型】6_3 语言模型数据集
注:本文为《动手学深度学习》开源内容,部分标注了个人理解,仅为个人学习记录,无抄袭搬运意图 6.3 语言模型数据集(周杰伦专辑歌词) 本节将介绍如何预处理一个语言模型数据集,并将其转换成字符级…...
技术选型思考:分库分表和分布式DB(TiDB/OceanBase) 的权衡与抉择
码到三十五 : 个人主页 心中有诗画,指尖舞代码,目光览世界,步履越千山,人间尽值得 ! 在当今数据爆炸的时代,数据库作为存储和管理数据的核心组件,其性能和扩展性成为了企业关注的重点。随着业…...
React改变数据【案例】
State传统方式 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>React Demo</title> <!--…...
ChatGPT Plus 自动扣费失败,如何续订
ChatGPT Plus 自动扣费失败,如何续订 如果您的 ChatGPT Plus 订阅过期或扣费失败,本教程将指导您如何重新订阅。 本周更新 ChatGPT Plus 是一种每月20美元的订阅服务。扣费会自动进行,如果您的账户余额不足,OpenAI 将在一次扣费…...
Rust: Channel 代码示例
在 Rust 中,通道(Channel)通常使用 std::sync::mpsc(多生产者单消费者)或 tokio::sync::mpsc(在异步编程中,特别是使用 Tokio 运行时)来创建。下面是一个使用 std::sync::mpsc 的简单…...
基于华为atlas的unet分割模型探索
Unet模型使用官方基于kaggle Carvana Image Masking Challenge数据集训练的模型。 模型输入为572*572*3,输出为572*572*2。分割目标分别为,0:背景,1:汽车。 Pytorch的pth模型转化onnx模型: import torchf…...
机器学习--循环神经网络(RNN)1
一、简介 循环神经网络(Recurrent Neural Network)是深度学习领域中一种非常经典的网络结构,在现实生活中有着广泛的应用。以槽填充(slot filling)为例,如下图所示,假设订票系统听到用户说&…...
基于java+springboot+vue实现的学生信息管理系统(文末源码+Lw+ppt)23-54
摘 要 人类现已进入21世纪,科技日新月异,经济、信息等方面都取得了长足的进步,特别是信息网络技术的飞速发展,对政治、经济、军事、文化等方面都产生了很大的影响。 利用计算机网络的便利,开发一套基于java的大学生…...
【漏洞复现】Linksys E2000 position.js 身份验证绕过漏洞(CVE-2024-27497)
0x01 产品简介 Linksys E2000是一款由思科(Cisco)品牌推出的无线路由器,它是一款支持2.4GHz和5GHz双频段的无线路由器,用户可以避开拥挤的2.4GHz频段,独自享受5GHz频段的高速无线生活。 0x02 漏洞概述 Linksys E200…...
小白跟做江科大51单片机之DS1302可调时钟
原理部分 1.DS1302可调时钟介绍 单片机定时器主要占用CPU时间,掉电不能继续运行 图1 2.原理 图2 内部有寄存器,寄存的时候以时分秒寄存,以通信协议实现数据交互,就可以实现对数据进行访问和读写 3.主要寄存器定义 CE芯片使能…...
2024蓝桥杯每日一题(归并排序)
一、第一题:火柴排队 解题思路:归并排序 重点在于想清楚是对哪个数组进行归并排序求逆序对 【Python程序代码】 from math import * n int(input()) a list(map(int,input().split())) b list(map(int,input().split())) na,nb [],[] for …...
生成对抗网络 (GAN)
生成对抗网络(Generative Adversarial Networks,GAN)是由Ian Goodfellow等人在2014年提出的一种深度学习模型。GAN由两部分组成:一个生成器(Generator)和一个判别器(Discriminator)&…...
QGridLayout网格布局和QVBoxLayout垂直布局有着非常大的差别
QGridLayout网格布局:1.把这块控件划分成一个个的 单元格 2.把你的控件填充进入 单元格 3.这些有关限制大小的函数接口统统失效 setMaximumWidth() setMinimumWidth() setPolicySize()图示:我是用的网格布局,左边放QT…...
HCIA-HarmonyOS设备开发认证V2.0-习题2
目录 习题一习题二坚持就有收获 习题一 # 判断题## 1.PWM占空比指的是低电平时间占周期时间的百分比。(错误)正确(True)错误(False)解题: - PWM占空比指的是高电平时间占周期时间的百分比## 2.UART是通用异步收发传输器,是通用串行数据总线,…...
ubuntu搭建nfs服务centos挂载访问
在Ubuntu上设置NFS服务器 在Ubuntu上,你可以使用apt包管理器来安装NFS服务器。打开终端并运行: sudo apt update sudo apt install nfs-kernel-server创建共享目录 创建一个目录用于共享,例如/shared: sudo mkdir /shared sud…...
什么是库存周转?如何用进销存系统提高库存周转率?
你可能听说过这样一句话: “利润不是赚出来的,是管出来的。” 尤其是在制造业、批发零售、电商这类“货堆成山”的行业,很多企业看着销售不错,账上却没钱、利润也不见了,一翻库存才发现: 一堆卖不动的旧货…...
如何将联系人从 iPhone 转移到 Android
从 iPhone 换到 Android 手机时,你可能需要保留重要的数据,例如通讯录。好在,将通讯录从 iPhone 转移到 Android 手机非常简单,你可以从本文中学习 6 种可靠的方法,确保随时保持连接,不错过任何信息。 第 1…...
从零开始打造 OpenSTLinux 6.6 Yocto 系统(基于STM32CubeMX)(九)
设备树移植 和uboot设备树修改的内容同步到kernel将设备树stm32mp157d-stm32mp157daa1-mx.dts复制到内核源码目录下 源码修改及编译 修改arch/arm/boot/dts/st/Makefile,新增设备树编译 stm32mp157f-ev1-m4-examples.dtb \stm32mp157d-stm32mp157daa1-mx.dtb修改…...
鸿蒙DevEco Studio HarmonyOS 5跑酷小游戏实现指南
1. 项目概述 本跑酷小游戏基于鸿蒙HarmonyOS 5开发,使用DevEco Studio作为开发工具,采用Java语言实现,包含角色控制、障碍物生成和分数计算系统。 2. 项目结构 /src/main/java/com/example/runner/├── MainAbilitySlice.java // 主界…...
10-Oracle 23 ai Vector Search 概述和参数
一、Oracle AI Vector Search 概述 企业和个人都在尝试各种AI,使用客户端或是内部自己搭建集成大模型的终端,加速与大型语言模型(LLM)的结合,同时使用检索增强生成(Retrieval Augmented Generation &#…...
Redis的发布订阅模式与专业的 MQ(如 Kafka, RabbitMQ)相比,优缺点是什么?适用于哪些场景?
Redis 的发布订阅(Pub/Sub)模式与专业的 MQ(Message Queue)如 Kafka、RabbitMQ 进行比较,核心的权衡点在于:简单与速度 vs. 可靠与功能。 下面我们详细展开对比。 Redis Pub/Sub 的核心特点 它是一个发后…...
使用LangGraph和LangSmith构建多智能体人工智能系统
现在,通过组合几个较小的子智能体来创建一个强大的人工智能智能体正成为一种趋势。但这也带来了一些挑战,比如减少幻觉、管理对话流程、在测试期间留意智能体的工作方式、允许人工介入以及评估其性能。你需要进行大量的反复试验。 在这篇博客〔原作者&a…...
【C++进阶篇】智能指针
C内存管理终极指南:智能指针从入门到源码剖析 一. 智能指针1.1 auto_ptr1.2 unique_ptr1.3 shared_ptr1.4 make_shared 二. 原理三. shared_ptr循环引用问题三. 线程安全问题四. 内存泄漏4.1 什么是内存泄漏4.2 危害4.3 避免内存泄漏 五. 最后 一. 智能指针 智能指…...
Python竞赛环境搭建全攻略
Python环境搭建竞赛技术文章大纲 竞赛背景与意义 竞赛的目的与价值Python在竞赛中的应用场景环境搭建对竞赛效率的影响 竞赛环境需求分析 常见竞赛类型(算法、数据分析、机器学习等)不同竞赛对Python版本及库的要求硬件与操作系统的兼容性问题 Pyth…...
