当前位置: 首页 > article >正文

kafka生产端之拦截器、分区器、序列化器

文章目录

  • 拦截器
  • 序列化器
  • 分区器

拦截器

拦截器(Interceptor)是早在Kafka0.10.0.0中就已经引入的一个功能,Kafka一共有两种拦截器:生产者拦截器和消费者拦截器。本节主要讲述生产者拦截器的相关内容,有关消费者拦截器的具体细节先不讲。

生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。

生产者拦截器的使用也很方便,主要是自定义实现org.apache.kafka.clients.producer.ProducerInterceptor接口。ProducerInterceptor接口中包含3个方法:

``
public ProducerRecord<K,V> onSend(ProducerRecord<K,V> record);

public void onAcknowledgement (RecordMetadata metadata,Exception exception);

public void close();
``

KafkaProducer在将消息序列化和计算分区之前会调用生产者拦截器的onSend0方法来对消息进行相应的定制化操作。一般来说最好不要修改消息ProducerRecord的topic、key和partition等信息,如果要修改,则需确保对其有准确的判断,否则会与预想的效果出现偏差。比如修改key不仅会影响分区的计算,同样会影响broker端日志压缩(LogCompaction)的功能。

KafkaProducer会在消息被应答(Acknowledgement)之前或消息发送失败时调用生产者拦截器的onAcknowledgementO方法,优先于用户设定的Caliback之前执行。这个方法运行在Producer 的IO线程中,所以这个方法中实现的代码逻辑越简单越好,否则会影响消息的发送速度。

close0方法主要用于在关闭拦截器时执行一些资源的清理工作。在这3个方法中抛出的异常都会被捕获并记录到日志中,但并不会再向上传递。

自定义拦截器时需要实现ProducerInterceptor接口,并在配置类或者配置中设置需要添加的拦截器,多个拦截器存在时会根据拦截器的添加顺序(配置类或者配置中)按顺序执行。

序列化器

生产者需要用序列化器(Serializer)把对象转换成字节数组才能通过网络发送给Kafka。而在对侧,消费者需要用反序列化器(Deserializer)把从Kafka中收到的字节数组转换成相应的对象。除了用于String类型的序列化器,还有ByteArray、ByteBuffer、Bytes、Double、Integer、Long这几种类型,它们都实现了orgapache.kafkka.common.serialization.Serializer接口。

生产者使用的序列化器和消费者使用的反序列化器是需要一一对应的,如果生产者使用了某种序列化器,比如StringSerializer,而消费者使用了另一种序列化器,比如IntegerSerializer那么是无法解析出想要的数据的。

如果Kafka客户端提供的几种序列化器都无法满足应用需求,则可以选择使用如Avro、JSON、Thrift、ProtoBuf和Protostuff等通用的序列化工具来实现,或者使用自定义类型的序列化器来实现。自定义序列化器需要实现Serializer接口,并在配置类或者配置中设置该自定义序列化器为生产端序列化器。

分区器

消息在通过send()方法发往broker的过程中,有可能需要经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)的一系列作用之后才能被真正地发往broker。拦截器一般不是必需的,而序列化器是必需的。消息经过序列化之后就需要确定它发往的分区,如果消息ProducerRecord中指定了partition字段,那么就不需要分区器的作用,因为partition代表的就是所要发往的分区号。

如果消息ProducerRecord中没有指定partition字段,那么就需要依赖分区器,根据key这个字段来计算partition的值。分区器的作用就是为消息分配分区。

Kafka中提供的默认分区器是org.apache.kafka.clients.producer.intemals.DefaultPartitioner,它实现了org.apache.kafka.clients.producer.Partitioner接口。

接口中partition()方法用来计算分区号,返回值为int类型。partition()方法中的参数分别表示主题、键、序列化后的键、值、序列化后的值,以及集群的元数据信息,通过这些信息可以实现功能丰富的分区器。close()方法在关闭分区器的时候用来回收一些资源。

在默认分区器DefaultPartitioner的实现中,closeO是空方法,而在partition0方法中定义了主要的分区分配逻辑。如果key不为null,那么默认的分区器会对key进行哈希,最终根据得到的哈希值来计算分区号,拥有相同key的消息会被写入同一个分区。如果key为null,那么消息将会以轮询的方式发往主题内的各个可用分区。

在不改变主题分区数量的情况下,key与分区之间的映射可以保持不变。不过,一旦主题中增加了分区,那么就难以保证key与分区之间的映射关系了。

除了使用Kafka提供的默认分区器进行分区分配,还可以使用自定义的分区器,只需同DefaultPartitioner一样实现Partitioner接口即可。默认的分区器在key为null时不会选择非可用的分区,我们可以通过自定义的分区器DemoPartitioner来打破这一限制。如果自定义分区器还是需要在配置类或者配置中设置分区器为自定义的分区器。

相关文章:

kafka生产端之拦截器、分区器、序列化器

文章目录 拦截器序列化器分区器 拦截器 拦截器&#xff08;Interceptor&#xff09;是早在Kafka0.10.0.0中就已经引入的一个功能&#xff0c;Kafka一共有两种拦截器&#xff1a;生产者拦截器和消费者拦截器。本节主要讲述生产者拦截器的相关内容&#xff0c;有关消费者拦截器的…...

BFS算法篇——广度优先搜索,探索未知的旅程(上)

文章目录 前言一、BFS的思路二、BFS的C语言实现1. 图的表示2. BFS的实现 三、代码解析四、输出结果五、总结 前言 广度优先搜索&#xff08;BFS&#xff09;是一种广泛应用于图论中的算法&#xff0c;常用于寻找最短路径、图的遍历等问题。与深度优先搜索&#xff08;DFS&…...

FPGA VGA timing

概念 VGA(Video Graphics Array)时序是控制VGA接口显示图像的关键参数,它主要包括行时序和场时序两部分。以下是对VGA时序的详细解释: 一、VGA接口简介 VGA接口是IBM公司在1987年推出的一种使用模拟信号的视频传输标准,具有成本低、结构简单、应用灵活等优点,至今仍被广…...

pytest生成报告no tests ran in 0.01s

除了基本的环境配置、用例名要以test_开头&#xff0c;有个地方是我自己忽略了&#xff0c;在执行时没有指定用例文件&#xff0c;所以没有找到。 if __name__ __main__:pytest.main(["testcases/test_demo.py","-svq", __file__, --alluredir./allure-r…...

Django开发入门 – 0.Django基本介绍

Django开发入门 – 0.Django基本介绍 A Brief Introduction to django By JacksonML 1. Django简介 1) 什么是Django? 依据其官网的一段解释&#xff1a; Django is a high-level Python web framework that encourages rapid development and clean, pragmatic design. …...

数巅科技中标科学城数科集团AI辅助企业数字化转型评估诊断

自2023年以来&#xff0c;财政部和工信部连续发布通知&#xff0c;强调要做好中小企业数字化转型城市试点工作&#xff0c;鼓励试点城市大力支持优质数字化服务商&#xff0c;研发攻关一批“小快轻准”数字化产品和解决方案&#xff0c;助力制造业关键领域的中小企业实现数字化…...

Linux proc虚拟文件系统

文章目录 简介proc常用节点pid节点procfs接口参考 简介 测试环境&#xff1a;Linux dev-PC 5.18.17-amd64-desktop-hwe #20.01.00.10 SMP PREEMPT_DYNAMIC Thu Jun 15 16:17:50 CST 2023 x86_64 GNU/Linux proc虚拟文件系统是linux内核提供的一种让用户和内核内部数据结构进行交…...

idea整合deepseek实现AI辅助编程

1.File->Settings 2.安装插件codegpt 3.注册deepseek开发者账号&#xff0c;DeepSeek开放平台 4.按下图指示创建API KEY 5.回到idea配置api信息&#xff0c;File->Settings->Tools->CodeGPT->Providers->Custom OpenAI API key填写deepseek的api key Chat…...

局域网内别的电脑怎么连接到对方的mysql数据库

要让局域网内的其他电脑连接到一台主机上的 MySQL 数据库,你需要进行一些配置,包括 MySQL 服务器的设置、权限调整,以及客户端连接的步骤。以下是详细的步骤说明: 1. 确保 MySQL 服务器允许远程连接 默认情况下,MySQL 服务器可能只允许本地连接(localhost)。你需要修改…...

加速汽车软件升级——堆栈刷写技术的应用与挑战

一、背景和挑战 | 背景&#xff1a; 当前汽车市场竞争激烈&#xff0c;多品牌并存&#xff0c;新车发布速度加快&#xff0c;价格逐渐降低&#xff0c;功能日益多样化。随着车辆功能的不断提升与优化&#xff0c;ECU&#xff08;电子控制单元&#xff09;的代码量也随之增加&…...

2. UVM的基本概念和架构

文章目录 前言1. UVM的基本概念1.1 UVM的核心组件1.2 UVM的基本架构1.3 UVM的工作流程 2. UVM的架构2.1 UVM的层次结构2.2 UVM的组件交互 3. 总结 前言 首先&#xff0c;得确定UVM的基本概念和架构包含哪些关键部分。我回忆起UVM的核心组件&#xff0c;比如uvm_component、uvm…...

【力扣】138.随机链表的复制

AC截图 题目 代码 使用哈希存储<旧节点&#xff0c;新结点> /* // Definition for a Node. class Node { public:int val;Node* next;Node* random;Node(int _val) {val _val;next NULL;random NULL;} }; */class Solution { public:Node* copyRandomList(Node* hea…...

防火墙、堡垒机和NAT

在网络安全中&#xff0c;防火墙、堡垒机&#xff08;Cloud Monitoring and Protection Machine&#xff09;和网络地址转换&#xff08;NAT&#xff09; 是三种核心设备&#xff0c;用于防御外来的访问和破坏性攻击。然而&#xff0c;这三种设备本身也可能面临多种网络安全威胁…...

归一化与伪彩:LabVIEW图像处理的区别

在LabVIEW的图像处理领域&#xff0c;归一化&#xff08;Normalization&#xff09;和伪彩&#xff08;Pseudo-coloring&#xff09;是两个不同的概念&#xff0c;虽然它们都涉及图像像素值的调整&#xff0c;但目的和实现方式截然不同。归一化用于调整像素值的范围&#xff0c…...

动态表格html

题目&#xff1a; 要求&#xff1a; 1.表格由专业班级学号1-10号同学的信息组成&#xff0c;包括&#xff1a;学号、姓 名、性别、二级学院、班级、专业、辅导员&#xff1b; 2.表格的奇数行字体为黑色&#xff0c;底色为白色&#xff1b;偶数行字体为白色&#xff0c;底 色为黑…...

通过k8s请求selfsubjectrulesreviews查询权限

当前是通过kubelet进行查询 curl --cacert /etc/kubernetes/pki/ca.crt \ --cert /var/lib/kubelet/pki/kubelet-client-current.pem \ --key /var/lib/kubelet/pki/kubelet-client-current.pem \ -d - \ -H "Content-Type: application/json" \ -H Accept: applicat…...

Leetcode 3447. Assign Elements to Groups with Constraints

Leetcode 3447. Assign Elements to Groups with Constraints 1. 解题思路2. 代码实现 题目链接&#xff1a;3447. Assign Elements to Groups with Constraints 1. 解题思路 这一题的话思路上我是预先算出可能数字对应的element&#xff0c;然后只要一次query就行了。 而至…...

Ollama + AnythingLLM + Deepseek r1 实现本地知识库

1、Ollama&#xff1a;‌是一个开源的大型语言模型 (LLM)服务工具&#xff0c;旨在简化在本地运行大语言模型的过程&#xff0c;降低使用大语言模型的门槛‌。 2、AnythingLLM&#xff1a;是由Mintplex Labs Inc. 开发的一款全栈应用程序&#xff0c;旨在构建一个高效、可定制、…...

Prompt逆向工程:如何“骗“大模型吐露其Prompt?

提示词的“逆向工程”&#xff0c;让AI大语言模型帮你反推提示词 一、前言 在日常生活中&#xff0c;我们不时会遇到一些令人惊艳的文本&#xff0c;不论是一篇精彩绝伦的小说、一篇深入浅出的科普文章&#xff0c;还是一篇充满热情的音乐推荐&#xff0c;它们都能在我们的心…...

Deepseek-v3 / Dify api接入飞书机器人go程序

准备工作 开通了接收消息权限的飞书机器人&#xff0c;例如我希望用户跟飞书机器人私聊&#xff0c;就需要开通这个权限&#xff1a;读取用户发给机器人的单聊消息 im:message.p2p_msg:readonly准备好飞书机器人的API key 和Secretdeepseek-v3的api keysecret&#xff1a;http…...

【docker】Failed to allocate manager object, freezing:兼容兼容 cgroup v1 和 v2

参考大神让系统同时兼容 cgroup v1 和 v2 要解决你系统中只挂载了 cgroup v2 但需要兼容 cgroup v1 的问题,可以通过以下几步来使系统同时兼容 cgroup v1 和 cgroup v2。这样 Docker 和其他服务就可以正常工作了。步骤 1:更新 Grub 配置,启用兼容模式 编辑 GRUB 配置来启用同…...

详解策略模式

引言 实现一个目标往往有多种方式&#xff0c;比如从上海到北京&#xff0c;可以选择高铁、火车、飞机、自驾等等。同样实现一个功能我们可能也有多种方法&#xff0c;把这些方法封装为算法&#xff0c;根据不同的需求选择不同的算法&#xff08;策略&#xff09;&#xff0c;让…...

2025影视泛目录站群程序设计_源码二次开发新版本无缓存刷新不变实现原理

1. 引言 本设站群程序计书旨在详细阐述苹果CMS泛目录的创新设计与实现&#xff0c;介绍无缓存刷新技术、数据统一化、局部URL控制及性能优化等核心功能&#xff0c;以提升网站访问速度和用户体验。 2. 技术概述 2.1 无缓存刷新技术 功能特点&#xff1a; 内容不变性&#x…...

【RabbitMQ】RabbitMQ的下载安装及使用

安装RabbitMQ 下载网站&#xff1a;https://www.rabbitmq.com/docs/install-windows 点击后&#xff0c;会直接定位到依赖介绍位置&#xff0c;告诉你需要安装Erlang 下载Erlang Erlang也是一种编程语言&#xff0c;只是比较小众&#xff0c;但其拥有极为出色的性能 这个网站是…...

Stylelint 如何处理 CSS 预处理器

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…...

Word中Ctrl+V粘贴报错问题

Word中CtrlV粘贴时显示“文件未找到&#xff1a;MathPage.WLL”的问题 Word的功能栏中有MathType&#xff0c;但无法使用&#xff0c;显示灰色。 解决方法如下&#xff1a; 首先找到MathType安装目录下MathPage.wll文件以及MathType Commands 2016.dotm文件&#xff0c;分别复…...

jmeter逻辑控制器9

1&#xff0c;简单控制器2&#xff0c;录制控制器3&#xff0c;循环控制器4&#xff0c;随机控制器5&#xff0c;随机顺序控制器6&#xff0c;if控制器7&#xff0c;模块控制器8&#xff0c;Include控制器9&#xff0c;事物控制器本文永久更新地址: 1&#xff0c;简单控制器 不…...

uniapp mqttjs 小程序开发

在UniApp中集成MQTT.js开发微信小程序时&#xff0c;需注意平台差异、协议兼容性及消息处理等问题。以下是关键步骤与注意事项的综合指南&#xff1a; 一、环境配置与依赖安装 安装MQTT.js 推荐使用兼容性较好的版本&#xff1a;mqtt4.1.0&#xff08;H5和小程序兼容性最佳&…...

GitHub Copilot Agent 模式系统提示词

系统提示词 你是一名 AI 编程助手。 当被问及你的名字时&#xff0c;你必须回答“GitHub Copilot”。请严格且完整地遵循用户的要求。 遵守微软内容政策。 避免涉及侵犯版权的内容。如果有人要求你生成有害、仇恨、种族主义、性别歧视、淫秽、暴力或与软件工程完全无关的内容&…...

【设计模式】【行为型模式】模板方法模式(Template Method)

&#x1f44b;hi&#xff0c;我不是一名外包公司的员工&#xff0c;也不会偷吃茶水间的零食&#xff0c;我的梦想是能写高端CRUD &#x1f525; 2025本人正在沉淀中… 博客更新速度 &#x1f4eb; 欢迎V&#xff1a; flzjcsg2&#xff0c;我们共同讨论Java深渊的奥秘 &#x1f…...