kafka消费者多线程开发
目录
前言
kafka consumer 设计原理
多线程的方案
参考资料
前言
目前,计算机的硬件条件已经大大改善,即使是在普通的笔记本电脑上,多核都已经是标配了,更不用说专业的服务器了。如果跑在强劲服务器机器上的应用程序依然是单线程架构,那实在是有点暴殄天物了。不过,Kafka Java Consumer 就是单线程的设计,你是不是感到很惊讶。所以,探究它的多线程消费方案,就显得非常必要了。
kafka consumer 设计原理
从 Kafka 0.10.1.0 版本开始,KafkaConsumer 就变为了双线程的设计,即用户主线程和心跳线程。
所谓用户主线程,就是你启动 Consumer 应用程序 main 方法的那个线程,而新引入的心跳线程(Heartbeat Thread)只负责定期给对应的 Broker 机器发送心跳请求,以标识消费者应用的存活性(liveness)。引入这个心跳线程还有一个目的,那就是期望它能将心跳频率与主线程调用 KafkaConsumer.poll 方法的频率分开,从而解耦真实的消息处理逻辑与消费者组成员存活性管理。
单线程的设计能够简化 Consumer 端的设计。Consumer 获取到消息后,处理消息的逻辑是否采用多线程,完全由你决定。这样,你就拥有了把消息处理的多线程管理策略从 Consumer 端代码中剥离的权利。
多线程的方案
我们要明确的是,KafkaConsumer 类不是线程安全的 (thread-safe)。所有的网络 I/O 处理都是发生在用户主线程中,因此,你在使用过程中必须要确保线程安全。简单来说,不能在多个线程中共享同一个 KafkaConsumer 实例,否则程序会抛出 ConcurrentModificationException 异常。
由于kafka consumer不是线程安全,我么你能制定两种多线程的方案。
1.消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程。如下图所示:

2.消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。获取消息的线程可以是一个,也可以是多个,每个线程维护专属的 KafkaConsumer 实例,处理消息则交由特定的线程池来做,从而实现消息获取与消息处理的真正解耦。具体架构如下图所示:

我们来打个比方。比如一个完整的消费者应用程序要做的事情是 1、2、3、4、5,那么方案 1 的思路是粗粒度化的工作划分,也就是说方案 1 会创建多个线程,每个线程完整地执行 1、2、3、4、5,以实现并行处理的目标,它不会进一步分割具体的子任务;而方案 2 则更细粒度化,它会将 1、2 分割出来,用单线程(也可以是多线程)来做,对于 3、4、5,则用另外的多个线程来做。
这两种方案的比较如下:

实现代码示例如下:
方案一的代码:
public class KafkaConsumerRunner implements Runnable {private final AtomicBoolean closed = new AtomicBoolean(false);private final KafkaConsumer consumer;public void run() {try {consumer.subscribe(Arrays.asList("topic"));while (!closed.get()) {ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));// 执行消息处理逻辑}} catch (WakeupException e) {// Ignore exception if closingif (!closed.get()) throw e;} finally {consumer.close();}}// Shutdown hook which can be called from a separate threadpublic void shutdown() {closed.set(true);consumer.wakeup();}
这段代码创建了一个 Runnable 类,表示执行消费获取和消费处理的逻辑。每个 KafkaConsumerRunner 类都会创建一个专属的 KafkaConsumer 实例。在实际应用中,你可以创建多个 KafkaConsumerRunner 实例,并依次执行启动它们,以实现方案 1 的多线程架构
方案2 的代码:
private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
...private int workerNum = ...;
executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());...
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));for (final ConsumerRecord record : records) {executors.submit(new Worker(record));}
}
..
参考资料
20 | 多线程开发消费者实例-极客时间
相关文章:
kafka消费者多线程开发
目录 前言 kafka consumer 设计原理 多线程的方案 参考资料 前言 目前,计算机的硬件条件已经大大改善,即使是在普通的笔记本电脑上,多核都已经是标配了,更不用说专业的服务器了。如果跑在强劲服务器机器上的应用程序依然是单…...
布局设计和实现:计算器UI【TableLayout、GridLayout】
一、使用TableLayout实现计算器UI 1.新建一个空白项目布局 根据自己的需求输入其他信息 填写完成后,点击Finish即可 2. 设计UI界面 在res/layout文件夹中的XML文件中创建UI界面。在这个XML文件中,您可以使用TableLayout来设计计算器界面。 2.1 创建l…...
stack与queue的简单封装
前言: stack与queue即栈和队列,先进后出/先进先出的特性我们早已了然于心, 在学习数据结构时,我们利用c语言实现栈与队列,从结构体写起,利用数组或指针表示他们的数据成员,之后再一个个实现他们…...
ChatGPT使用技巧整理
目录 1. 让ChatGPT扮演专家角色2. 告诉ChatGPT你的身份3. 限制ChatGPT的回答长度4. 让ChatGPT一步步思考5. 明确你的要求和目的6. 提供充分的背景信息7. 始终结构化思考你的prompt1. 让ChatGPT扮演专家角色 当你们讨论的是市场营销问题时,你可以要求ChatGPT扮演一个具有20年从…...
机器学习笔记 - 维度诅咒的数学表达
1、点之间的距离 kNN分类器假设相似的点也可能有相同的标签。但是,在高维空间中,从概率分布中得出的点往往不会始终靠近在一起。 我们可以用一个简单的例子来说明这一点。 我们将在单位立方体内均匀地随机绘制点(如图所示),并研究该立方体内测试点的 k 个最近邻将占用多少…...
组合计数训练题解
CF40E 题目链接 点击打开链接 题目解法 首先,如果 n , m n,m n,m 一奇一偶,那么答案为 0 0 0 原因是从行和列的角度分析, − 1 -1 −1 个数的奇偶性不同 可以发现 k < max { n , m } k<\max\{n,m\} k<max{n,m} 的性质很微…...
P1095 [NOIP2007 普及组] 守望者的逃离
[NOIP2007 普及组] 守望者的逃离 - 洛谷 首先DP的套路就是先找状态 这题也找不出其他的状态了,只有时间一个 所以用f[i]表示时刻i能走多远 而仔细一想实际上决策只有跑、闪现、停三种决策 然而闪现的耗蓝要和跑步一同计算十分麻烦 于是把它们分开算࿱…...
Python函数绘图与高等代数互融实例(八):箱线图|误差棒图|堆积图
Python函数绘图与高等代数互融实例(一):正弦函数与余弦函数 Python函数绘图与高等代数互融实例(二):闪点函数 Python函数绘图与高等代数互融实例(三):设置X|Y轴|网格线 Python函数绘图与高等代数互融实例(四):设置X|Y轴参考线|参考区域 Python函数绘图与高等代数互融实例(五…...
联想y7000 y7000p 2018/2019 不插电源 不插充电器, 直接关机 ,电量一直89%/87%/86%,V0005如何解决?
这种问题,没有外力破坏的话,电池不可能突然出事。这种一般是联想的固件问题,有可能发生在系统更新,或者突然的不正常关机或长时间电池过热,原因我不是很清楚。 既然发生了,根据我收集的解决方法,…...
stm32与esp8266通信
esp8266 #include <ESP8266WiFi.h> #include <ESP8266HTTPClient.h>// 测试HTTP请求用的URL // #define URL "http://162.14.107.118:8086/PC/modifyFoodPrice/0/6"// 测试HTTP请求用的URL // 设置wifi接入信息(请根据您的WiFi信息进行修改) const char…...
组合数 2.1 2.2
O(nlogn)预处理, O(1)查询 #include<bits/stdc.h> #define IOS ios::sync_with_stdio(0);cin.tie(0);cout.tie(0); #define endl \nusing namespace std;typedef pair<int, int> PII; typedef long long ll; typedef long double ld;const int N 1000…...
【数组的中心位置】python实现-附ChatGPT解析
1.题目 数组的中心位置 题目 给你一个整数数组 nums,请计算数组的中心位置。 数组中心位置是数组的一个下标,其左侧所有元素相乘的积等于右侧所有元素相乘的积。 数组第一个元素的左侧积为 1,最后一个元素的右侧积为 1。 如果数组有多个中心位置,应该返回最靠近左边的那一个…...
黑马JVM总结(二十三)
(1)字节码指令-init 方法体内有一些字节,对应着将来要由java虚拟机执行方法内的代码,构造方法里5个字节代码,main方法里有9个字节的代码 java虚拟机呢内部有一个解释器,这个解释器呢可以识别平台无关的字…...
AI人体行为分析:玩手机/打电话/摔倒/攀爬/扭打检测及TSINGSEE场景解决方案
一、AI人体行为分析技术概述及场景 人体姿态分析/行为分析/动作识别AI算法,是一种利用人工智能技术对人体行为进行检测、跟踪和分析的方法。通过计算机视觉、深度学习和模式识别等技术,可以实现对人体姿态、动作和行为的自动化识别与分析。 在场景应用…...
HI_NAS linux 记录
dev/root 100% 占用解决记录 通过下面的命令查看各文件夹 大小 sudo du --max-depth1 -h # 统计当前文件夹下各个文件夹的大小显示为M 最终发现Var/log 占用很大空间 发现下面两个 log 占用空间很大,直接 rm-rf 即可 HI NAS python3 记录 # 安装pip3 sudo apt u…...
计算机图形学中的几何光学
文章目录 前言一、图形学中的光学二、光照模型1、经验型(简单)2、物理型(复杂) 前言 在学习Shader光照之前了解一下计算机图形学 一、图形学中的光学 镜面反射的效果例子:物体表面高光 慢反射的效果的例子:…...
「UG/NX」BlockUI 选择小平面区域 Select Facet Region
✨博客主页何曾参静谧的博客📌文章专栏「UG/NX」BlockUI集合📚全部专栏「UG/NX」NX二次开发「UG/NX」BlockUI集合「VS」Visual Studio「QT」QT5程序设计「C/C」C/C程序设计「Win」Windows程序设计「DSA」数据结构与算法「File」数据文件格式 目录 控件说…...
【完全二叉树魔法:顺序结构实现堆的奇象】
本章重点 二叉树的顺序结构堆的概念及结构堆的实现堆的调整算法堆的创建堆排序TOP-K问题 1.二叉树的顺序结构 普通的二叉树是不适合用数组来存储的,因为可能会存在大量的空间浪费。而完全二叉树更适合使用顺序结构存储。现实中我们通常把堆(一种二叉树)使用顺序结构…...
Maven官方镜像仓库与阿里云云效Maven
一、Maven官方镜像仓库 download maven-3 右击复制链接地址,使用wget命令直接在linux中下载: wget 链接地址history 二、阿里云云效Maven 详情查看maven 配置指南 打开 maven 的配置文件( windows 机器一般在 maven 安装目录的 conf/…...
python系列教程215——列表解析与矩阵
朋友们,如需转载请标明出处:https://blog.csdn.net/jiangjunshow 声明:在人工智能技术教学期间,不少学生向我提一些python相关的问题,所以为了让同学们掌握更多扩展知识更好地理解AI技术,我让助理负责分享…...
Charticulator:突破传统桎梏的自定义数据可视化革新——从模板依赖到自由创作
Charticulator:突破传统桎梏的自定义数据可视化革新——从模板依赖到自由创作 【免费下载链接】charticulator Interactive Layout-Aware Construction of Bespoke Charts 项目地址: https://gitcode.com/gh_mirrors/ch/charticulator 数据可视化工具是否常常…...
图像超分新思路:拆解SCNet的‘空间移位’操作,看它如何用零参数实现3x3卷积的效果
图像超分辨率革命:零参数空间移位如何颠覆传统卷积设计 当你在手机相册里翻出一张十年前的老照片,是否曾幻想过能一键修复那些模糊的像素?这正是图像超分辨率技术试图解决的难题。传统方法依赖计算密集的33卷积,而SCNet提出的&quo…...
Beekeeper Studio:现代跨平台数据库管理工具的技术架构与实战应用
Beekeeper Studio:现代跨平台数据库管理工具的技术架构与实战应用 【免费下载链接】beekeeper-studio beekeeper-studio/beekeeper-studio: Beekeeper Studio 是一款开源的跨平台数据库客户端工具,支持多种数据库(如MySQL, PostgreSQL, SQLit…...
语义分割竞赛必备:5种Loss函数组合效果对比(含Dice+Focal Loss调参指南)
语义分割竞赛进阶:5种损失函数组合实战评测与调参策略 在Kaggle等数据竞赛中,语义分割任务的性能提升往往取决于损失函数的巧妙选择与组合。不同于常规分类任务,多类别像素级预测需要处理极端类别不平衡、边界模糊等独特挑战。本文将深入剖析…...
秀米能做的它都行,AI 写作让内容生产更简单
「选题想破头,初稿磨半天,排版更费神。」这或许是当下许多小编、运营乃至企业内容负责人的日常写照。内容需求暴涨,但高质量产出一直是道门槛。传统的编辑器,如秀米等,已极大简化了图文排版与可视化编辑的流程…...
Windows服务器部署:OpenClaw守护进程+Qwen3-32B镜像长期运行
Windows服务器部署:OpenClaw守护进程Qwen3-32B镜像长期运行 1. 为什么需要服务器级部署? 去年我尝试在个人笔记本上运行OpenClaw时,经常遇到两个头疼的问题:一是夜间执行任务时电脑休眠导致流程中断,二是长时间运行后…...
OpenClaw 的 Skill免费开源的
OpenClaw 的 Skill 生态非常丰富,其中绝大部分都是免费开源的。以下为您推荐几类实用的免费插件,您可以根据需求选择安装。🛡️ 一、安全与权限控制 (强烈建议优先安装)skill-vetter / clawsec功能:安装插件前自动扫描代码&#x…...
路径规划算法技术选型与实战指南:从理论到工程落地
路径规划算法技术选型与实战指南:从理论到工程落地 【免费下载链接】PathPlanning Common used path planning algorithms with animations. 项目地址: https://gitcode.com/gh_mirrors/pa/PathPlanning 当仓库机器人在密集货架间灵活避障,当无人…...
DeepSeek-OCR实战教程:批量处理脚本编写与异步解析任务队列设计
DeepSeek-OCR实战教程:批量处理脚本编写与异步解析任务队列设计 1. 学习目标与场景引入 如果你正在处理大量的文档图片,比如扫描的合同、发票、报告或者历史档案,一张张上传到DeepSeek-OCR界面手动处理,不仅效率低下,…...
深入TC397与TLF35584的SPI通信:从寄存器操作到汽车ECU低功耗状态管理实战
深入TC397与TLF35584的SPI通信:从寄存器操作到汽车ECU低功耗状态管理实战 在汽车电子领域,电源管理芯片的选择与配置直接关系到整车电子控制单元(ECU)的可靠性与能耗表现。英飞凌的TLF35584作为一款高集成度电源管理IC,…...
