当前位置: 首页 > news >正文

kafka Interceptors and Listeners

Interceptors

ProducerInterceptor

https://www.cnblogs.com/huxi2b/p/7072447.html

Producer拦截器(interceptor)是个相当新的功能,它和consumer端interceptor是在Kafka 0.10版本被引入的,主要用于实现clients端的定制化控制逻辑。

对于producer而言,interceptor使得用户在消息发送前以及producer回调逻辑前有机会对消息做一些定制化需求,比如修改消息等。同时,producer允许用户指定多个interceptor按序作用于同一条消息从而形成一个拦截链(interceptor chain),按照指定顺序调用它们.

API

public interface ProducerInterceptor<K, V> extends Configurable, AutoCloseable {//该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中的。Producer确保在消息被序列化以计算分区**前**调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区的计算ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);//该方法会在消息被应答之前或消息发送失败时调用,并且通常都是在producer回调逻辑触发之前。onAcknowledgement运行在producer的IO线程中,因此不要在该方法中放入很重的逻辑,否则会拖慢producer的消息发送效率void onAcknowledgement(RecordMetadata metadata, Exception exception);//关闭interceptor,主要用于执行一些资源清理工作void close();
}

demo

    public static void main(String[] args) throws ExecutionException, InterruptedException {Map<String, Object> props = new HashMap();props.put("bootstrap.servers", "localhost:9092");props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, RawSerializer.class);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, RawSerializer.class);List<String> interceptors = new ArrayList<>();interceptors.add("cn.jhs.kakfa.p.interceptor.TimeStampInterceptor"); // interceptor 1interceptors.add("cn.jhs.kakfa.p.interceptor.CounterInterceptor"); // interceptor 2props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);String topic = "test-topic";Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> record = new ProducerRecord<>(topic, "message:" + i);producer.send(record).get();}// 一定要关闭producer,这样才会调用interceptor的close方法producer.close();}
}

ConsumerInterceptor

https://blog.csdn.net/warybee/article/details/121980296

消费者(Consumer)在拉取了分区消息之后,要首先经过反序列化器对key和value进行反序列化处理,处理完之后,如果消费端设置了拦截器,则需要经过拦截器的处理之后,才能返回给消费者应用程 序进行处理。

  • ConsumerInterceptor允许拦截甚至更改消费者接收到的消息。
  • 常用在于将第三方组件引入 消费者应用程序,用于定制的监控、日志处理等。
  • ConsumerInterceptor方法抛出的异常会被捕获、记录,但是不会向下传播。如果用户配置了 错误的key或value类型参数,消费者不会抛出异常,而仅仅是记录下来。
  • 如果有多个拦截器,则该方法按照KafkaConsumer的configs中配置的顺序调用。
  • 从调用 KafkaConsumer.poll(long) 的同一线程调用 ConsumerInterceptor 回调。

API

public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {/**该方法在poll方法返回之前调用。调用结束后poll方法就返回消息了。该方法可以修改消费者消息,返回新的消息。拦截器可以过滤收到的消息或生成新的消息。*/ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);/**当消费者提交偏移量时,调用该方法。通常你可以在该方法中做一些记账类的动作,比如打日志等。调用者将忽略此方法抛出的任何异常。*/void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);/***  关闭Interceptor之前调用*/void close();
}

配置

//如果有多个拦截器,用,分割即可
configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,"com.warybee.interceptor.MyConsumerInterceptor");

Listeners

ProducerListener

https://blog.csdn.net/u014494148/article/details/125344184

Kafka提供了生产者监听器 ProducerListener,他的作用类似于带回调的KafkaTemplate#send(callback) ; 可以监听到消息发送成功或者失败。ProducerListener 提供了onSuccess 成功回调,和 onError 失败回调,如下:

API

public interface ProducerListener<K, V> {/*** Invoked after the successful send of a message (that is, after it has been acknowledged by the broker).*/default void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {}/*** Invoked after an attempt to send a message has failed.*/default void onError(ProducerRecord<K, V> producerRecord, @Nullable RecordMetadata recordMetadata,Exception exception) {}}

自定义Listener

public class MyProducerListener<K, V> implements ProducerListener<K, V> {private FallbackHandler<K, V> fallbackHandler;@Overridepublic void onError(ProducerRecord<K, V> producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) {//fallbackHandler.process.//write error metrics...}@Overridepublic void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {//write success metrics...}
}

demo(KafkaTemplate.setProducerListener())

    public KafkaTemplate<Object, Object> buildKafkaTemplate(Map<String, Object> props) {ProducerFactory<Object, Object> factory = new DefaultKafkaProducerFactory<>(props);KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(factory);MyProducerListener<Object, Object> listener1 = new MyProducerListener<>();listener1.setFallbackHandler(fallbackHandler);kafkaTemplate.setProducerListener(listener1);return kafkaTemplate;}

KafkaListenerErrorHandler

当@KafkaListener方法抛出异常时调用的错误处理程序.

API

@FunctionalInterface
public interface KafkaListenerErrorHandler {/*** Handle the error.*/Object handleError(Message<?> message, ListenerExecutionFailedException exception);
}

自定义CustomKafkaListenerErrorHandler(当异常过多时,暂停消费)

/*** 可以通过:* @org.springframework.kafka.annotation.KafkaListener(errorHandler="customKafkaListenerErrorHandler")* 来引入该配置*/
@Component
public class CustomKafkaListenerErrorHandler implements KafkaListenerErrorHandler {//记录了所有的 kafka MessageListenerContainerprivate final KafkaListenerEndpointRegistry endpointRegistry;public CustomKafkaListenerErrorHandler(KafkaListenerEndpointRegistry endpointRegistry) {this.endpointRegistry = endpointRegistry;}@Overridepublic Object handleError(Message<?> message, ListenerExecutionFailedException exception) {// 处理异常// 暂停消费者String listenerId = exception.getGroupId();MessageListenerContainer listenerContainer = endpointRegistry.getListenerContainer(listenerId);listenerContainer.pause();//滑动窗口算法 ---// 休眠一段时间(例如 30秒)try {Thread.sleep(30000); // 暂停 30 秒} catch (InterruptedException e) {Thread.currentThread().interrupt();}// 恢复消费者listenerContainer.resume();return null;}
}

demo

@org.springframework.kafka.annotation.KafkaListener(errorHandler="customKafkaListenerErrorHandler")

Callback

producer.Callback

public interface Callback {//processed befeore listener...void onCompletion(RecordMetadata metadata, Exception exception);
}

demo

producer.send(producerRecord, (recordMetadata, exception) -> {if (exception == null) {System.out.println("Record written to offset " +recordMetadata.offset() + " timestamp " +recordMetadata.timestamp());} else {System.err.println("An error occurred");exception.printStackTrace(System.err);}
});

相关文章:

kafka Interceptors and Listeners

Interceptors ProducerInterceptor https://www.cnblogs.com/huxi2b/p/7072447.html Producer拦截器(interceptor)是个相当新的功能&#xff0c;它和consumer端interceptor是在Kafka 0.10版本被引入的&#xff0c;主要用于实现clients端的定制化控制逻辑。 对于producer而言&…...

【面试题】mysql常见面试题及答案总结

事务中的ACID原则是什么? Mysql是如何实现或者保障ACID的? ACID原则是数据库事务管理中必须满足的四个基本属性&#xff0c;确保了数据库事务的可靠性和数据完整性。 简写全称解释实现A原子性&#xff08;Atomicity&#xff09;一个事务被视为一个不可分割的操作序列&#…...

C++ 类的前向声明的用法

我们知道C的类应当是先定义&#xff0c;然后使用。但在处理相对复杂的问题、考虑类的组合时&#xff0c;很可能遇到俩个类相互引用的情况&#xff0c;这种情况称为循环依赖。 例如&#xff1a; class A { public:void f(B b);//以B类对象b为形参的成员函数//这里编译错位&…...

二分查找(c语言)

二分查找 一.什么是二分查找二.代码实现 一.什么是二分查找 在⼀个升序的数组中查找制定的数字n&#xff0c;很容易想到的⽅法就是遍历数组&#xff0c;但是这种⽅法效率⽐较低&#xff0c; ⽐如我买了⼀双鞋&#xff0c;你好奇问我多少钱&#xff0c;我说不超过300元。你还是好…...

【记录31】elementUI el-tree 虚线、右键、拖拽

父组件 <eltree :treeData"treeData"></eltree>import eltree from "../../components/tree.vue"; export default {name: ,components: { // org_tree ,eltree},watch: {},data() {return {orgFormchoose: {},orgForm: { type: 0, limits: 1…...

【C++】函数重载

&#x1f984;个人主页:修修修也 &#x1f38f;所属专栏:C ⚙️操作环境:Visual Studio 2022 目录 &#x1f4cc;函数重载的定义 &#x1f4cc;函数重载的三种类型 &#x1f38f;参数个数不同 &#x1f38f;参数类型不同 &#x1f38f;参数类型顺序不同 &#x1f4cc;重载…...

【深度学习模型】6_3 语言模型数据集

注&#xff1a;本文为《动手学深度学习》开源内容&#xff0c;部分标注了个人理解&#xff0c;仅为个人学习记录&#xff0c;无抄袭搬运意图 6.3 语言模型数据集&#xff08;周杰伦专辑歌词&#xff09; 本节将介绍如何预处理一个语言模型数据集&#xff0c;并将其转换成字符级…...

技术选型思考:分库分表和分布式DB(TiDB/OceanBase) 的权衡与抉择

码到三十五 &#xff1a; 个人主页 心中有诗画&#xff0c;指尖舞代码&#xff0c;目光览世界&#xff0c;步履越千山&#xff0c;人间尽值得 ! 在当今数据爆炸的时代&#xff0c;数据库作为存储和管理数据的核心组件&#xff0c;其性能和扩展性成为了企业关注的重点。随着业…...

React改变数据【案例】

State传统方式 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>React Demo</title> <!--…...

ChatGPT Plus 自动扣费失败,如何续订

ChatGPT Plus 自动扣费失败&#xff0c;如何续订 如果您的 ChatGPT Plus 订阅过期或扣费失败&#xff0c;本教程将指导您如何重新订阅。 本周更新 ChatGPT Plus 是一种每月20美元的订阅服务。扣费会自动进行&#xff0c;如果您的账户余额不足&#xff0c;OpenAI 将在一次扣费…...

Rust: Channel 代码示例

在 Rust 中&#xff0c;通道&#xff08;Channel&#xff09;通常使用 std::sync::mpsc&#xff08;多生产者单消费者&#xff09;或 tokio::sync::mpsc&#xff08;在异步编程中&#xff0c;特别是使用 Tokio 运行时&#xff09;来创建。下面是一个使用 std::sync::mpsc 的简单…...

基于华为atlas的unet分割模型探索

Unet模型使用官方基于kaggle Carvana Image Masking Challenge数据集训练的模型。 模型输入为572*572*3&#xff0c;输出为572*572*2。分割目标分别为&#xff0c;0&#xff1a;背景&#xff0c;1&#xff1a;汽车。 Pytorch的pth模型转化onnx模型&#xff1a; import torchf…...

机器学习--循环神经网络(RNN)1

一、简介 循环神经网络&#xff08;Recurrent Neural Network&#xff09;是深度学习领域中一种非常经典的网络结构&#xff0c;在现实生活中有着广泛的应用。以槽填充&#xff08;slot filling&#xff09;为例&#xff0c;如下图所示&#xff0c;假设订票系统听到用户说&…...

基于java+springboot+vue实现的学生信息管理系统(文末源码+Lw+ppt)23-54

摘 要 人类现已进入21世纪&#xff0c;科技日新月异&#xff0c;经济、信息等方面都取得了长足的进步&#xff0c;特别是信息网络技术的飞速发展&#xff0c;对政治、经济、军事、文化等方面都产生了很大的影响。 利用计算机网络的便利&#xff0c;开发一套基于java的大学生…...

【漏洞复现】Linksys E2000 position.js 身份验证绕过漏洞(CVE-2024-27497)

0x01 产品简介 Linksys E2000是一款由思科&#xff08;Cisco&#xff09;品牌推出的无线路由器&#xff0c;它是一款支持2.4GHz和5GHz双频段的无线路由器&#xff0c;用户可以避开拥挤的2.4GHz频段&#xff0c;独自享受5GHz频段的高速无线生活。 0x02 漏洞概述 Linksys E200…...

小白跟做江科大51单片机之DS1302可调时钟

原理部分 1.DS1302可调时钟介绍 单片机定时器主要占用CPU时间&#xff0c;掉电不能继续运行 图1 2.原理 图2 内部有寄存器&#xff0c;寄存的时候以时分秒寄存&#xff0c;以通信协议实现数据交互&#xff0c;就可以实现对数据进行访问和读写 3.主要寄存器定义 CE芯片使能…...

2024蓝桥杯每日一题(归并排序)

一、第一题&#xff1a;火柴排队 解题思路&#xff1a;归并排序 重点在于想清楚是对哪个数组进行归并排序求逆序对 【Python程序代码】 from math import * n int(input()) a list(map(int,input().split())) b list(map(int,input().split())) na,nb [],[] for …...

生成对抗网络 (GAN)

生成对抗网络&#xff08;Generative Adversarial Networks&#xff0c;GAN&#xff09;是由Ian Goodfellow等人在2014年提出的一种深度学习模型。GAN由两部分组成&#xff1a;一个生成器&#xff08;Generator&#xff09;和一个判别器&#xff08;Discriminator&#xff09;&…...

QGridLayout网格布局和QVBoxLayout垂直布局有着非常大的差别

QGridLayout网格布局&#xff1a;1.把这块控件划分成一个个的 单元格 2.把你的控件填充进入 单元格 3.这些有关限制大小的函数接口统统失效 setMaximumWidth&#xff08;&#xff09; setMinimumWidth() setPolicySize()图示&#xff1a;我是用的网格布局&#xff0c;左边放QT…...

HCIA-HarmonyOS设备开发认证V2.0-习题2

目录 习题一习题二坚持就有收获 习题一 # 判断题## 1.PWM占空比指的是低电平时间占周期时间的百分比。(错误)正确(True)错误(False)解题&#xff1a; - PWM占空比指的是高电平时间占周期时间的百分比## 2.UART是通用异步收发传输器&#xff0c;是通用串行数据总线&#xff0c;…...

FastAPI 教程:从入门到实践

FastAPI 是一个现代、快速&#xff08;高性能&#xff09;的 Web 框架&#xff0c;用于构建 API&#xff0c;支持 Python 3.6。它基于标准 Python 类型提示&#xff0c;易于学习且功能强大。以下是一个完整的 FastAPI 入门教程&#xff0c;涵盖从环境搭建到创建并运行一个简单的…...

2024年赣州旅游投资集团社会招聘笔试真

2024年赣州旅游投资集团社会招聘笔试真 题 ( 满 分 1 0 0 分 时 间 1 2 0 分 钟 ) 一、单选题(每题只有一个正确答案,答错、不答或多答均不得分) 1.纪要的特点不包括()。 A.概括重点 B.指导传达 C. 客观纪实 D.有言必录 【答案】: D 2.1864年,()预言了电磁波的存在,并指出…...

将对透视变换后的图像使用Otsu进行阈值化,来分离黑色和白色像素。这句话中的Otsu是什么意思?

Otsu 是一种自动阈值化方法&#xff0c;用于将图像分割为前景和背景。它通过最小化图像的类内方差或等价地最大化类间方差来选择最佳阈值。这种方法特别适用于图像的二值化处理&#xff0c;能够自动确定一个阈值&#xff0c;将图像中的像素分为黑色和白色两类。 Otsu 方法的原…...

智能仓储的未来:自动化、AI与数据分析如何重塑物流中心

当仓库学会“思考”&#xff0c;物流的终极形态正在诞生 想象这样的场景&#xff1a; 凌晨3点&#xff0c;某物流中心灯火通明却空无一人。AGV机器人集群根据实时订单动态规划路径&#xff1b;AI视觉系统在0.1秒内扫描包裹信息&#xff1b;数字孪生平台正模拟次日峰值流量压力…...

Python ROS2【机器人中间件框架】 简介

销量过万TEEIS德国护膝夏天用薄款 优惠券冠生园 百花蜂蜜428g 挤压瓶纯蜂蜜巨奇严选 鞋子除臭剂360ml 多芬身体磨砂膏280g健70%-75%酒精消毒棉片湿巾1418cm 80片/袋3袋大包清洁食品用消毒 优惠券AIMORNY52朵红玫瑰永生香皂花同城配送非鲜花七夕情人节生日礼物送女友 热卖妙洁棉…...

短视频矩阵系统文案创作功能开发实践,定制化开发

在短视频行业迅猛发展的当下&#xff0c;企业和个人创作者为了扩大影响力、提升传播效果&#xff0c;纷纷采用短视频矩阵运营策略&#xff0c;同时管理多个平台、多个账号的内容发布。然而&#xff0c;频繁的文案创作需求让运营者疲于应对&#xff0c;如何高效产出高质量文案成…...

return this;返回的是谁

一个审批系统的示例来演示责任链模式的实现。假设公司需要处理不同金额的采购申请&#xff0c;不同级别的经理有不同的审批权限&#xff1a; // 抽象处理者&#xff1a;审批者 abstract class Approver {protected Approver successor; // 下一个处理者// 设置下一个处理者pub…...

接口自动化测试:HttpRunner基础

相关文档 HttpRunner V3.x中文文档 HttpRunner 用户指南 使用HttpRunner 3.x实现接口自动化测试 HttpRunner介绍 HttpRunner 是一个开源的 API 测试工具&#xff0c;支持 HTTP(S)/HTTP2/WebSocket/RPC 等网络协议&#xff0c;涵盖接口测试、性能测试、数字体验监测等测试类型…...

基于单片机的宠物屋智能系统设计与实现(论文+源码)

本设计基于单片机的宠物屋智能系统核心是实现对宠物生活环境及状态的智能管理。系统以单片机为中枢&#xff0c;连接红外测温传感器&#xff0c;可实时精准捕捉宠物体温变化&#xff0c;以便及时发现健康异常&#xff1b;水位检测传感器时刻监测饮用水余量&#xff0c;防止宠物…...

用神经网络读懂你的“心情”:揭秘情绪识别系统背后的AI魔法

用神经网络读懂你的“心情”:揭秘情绪识别系统背后的AI魔法 大家好,我是Echo_Wish。最近刷短视频、看直播,有没有发现,越来越多的应用都开始“懂你”了——它们能感知你的情绪,推荐更合适的内容,甚至帮客服识别用户情绪,提升服务体验。这背后,神经网络在悄悄发力,撑起…...