kafka消费者多线程开发
目录
前言
kafka consumer 设计原理
多线程的方案
参考资料
前言
目前,计算机的硬件条件已经大大改善,即使是在普通的笔记本电脑上,多核都已经是标配了,更不用说专业的服务器了。如果跑在强劲服务器机器上的应用程序依然是单线程架构,那实在是有点暴殄天物了。不过,Kafka Java Consumer 就是单线程的设计,你是不是感到很惊讶。所以,探究它的多线程消费方案,就显得非常必要了。
kafka consumer 设计原理
从 Kafka 0.10.1.0 版本开始,KafkaConsumer 就变为了双线程的设计,即用户主线程和心跳线程。
所谓用户主线程,就是你启动 Consumer 应用程序 main 方法的那个线程,而新引入的心跳线程(Heartbeat Thread)只负责定期给对应的 Broker 机器发送心跳请求,以标识消费者应用的存活性(liveness)。引入这个心跳线程还有一个目的,那就是期望它能将心跳频率与主线程调用 KafkaConsumer.poll 方法的频率分开,从而解耦真实的消息处理逻辑与消费者组成员存活性管理。
单线程的设计能够简化 Consumer 端的设计。Consumer 获取到消息后,处理消息的逻辑是否采用多线程,完全由你决定。这样,你就拥有了把消息处理的多线程管理策略从 Consumer 端代码中剥离的权利。
多线程的方案
我们要明确的是,KafkaConsumer 类不是线程安全的 (thread-safe)。所有的网络 I/O 处理都是发生在用户主线程中,因此,你在使用过程中必须要确保线程安全。简单来说,不能在多个线程中共享同一个 KafkaConsumer 实例,否则程序会抛出 ConcurrentModificationException 异常。
由于kafka consumer不是线程安全,我么你能制定两种多线程的方案。
1.消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程。如下图所示:

2.消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。获取消息的线程可以是一个,也可以是多个,每个线程维护专属的 KafkaConsumer 实例,处理消息则交由特定的线程池来做,从而实现消息获取与消息处理的真正解耦。具体架构如下图所示:

我们来打个比方。比如一个完整的消费者应用程序要做的事情是 1、2、3、4、5,那么方案 1 的思路是粗粒度化的工作划分,也就是说方案 1 会创建多个线程,每个线程完整地执行 1、2、3、4、5,以实现并行处理的目标,它不会进一步分割具体的子任务;而方案 2 则更细粒度化,它会将 1、2 分割出来,用单线程(也可以是多线程)来做,对于 3、4、5,则用另外的多个线程来做。
这两种方案的比较如下:

实现代码示例如下:
方案一的代码:
public class KafkaConsumerRunner implements Runnable {private final AtomicBoolean closed = new AtomicBoolean(false);private final KafkaConsumer consumer;public void run() {try {consumer.subscribe(Arrays.asList("topic"));while (!closed.get()) {ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));// 执行消息处理逻辑}} catch (WakeupException e) {// Ignore exception if closingif (!closed.get()) throw e;} finally {consumer.close();}}// Shutdown hook which can be called from a separate threadpublic void shutdown() {closed.set(true);consumer.wakeup();}
这段代码创建了一个 Runnable 类,表示执行消费获取和消费处理的逻辑。每个 KafkaConsumerRunner 类都会创建一个专属的 KafkaConsumer 实例。在实际应用中,你可以创建多个 KafkaConsumerRunner 实例,并依次执行启动它们,以实现方案 1 的多线程架构
方案2 的代码:
private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
...private int workerNum = ...;
executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());...
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));for (final ConsumerRecord record : records) {executors.submit(new Worker(record));}
}
..
参考资料
20 | 多线程开发消费者实例-极客时间
相关文章:
kafka消费者多线程开发
目录 前言 kafka consumer 设计原理 多线程的方案 参考资料 前言 目前,计算机的硬件条件已经大大改善,即使是在普通的笔记本电脑上,多核都已经是标配了,更不用说专业的服务器了。如果跑在强劲服务器机器上的应用程序依然是单…...
布局设计和实现:计算器UI【TableLayout、GridLayout】
一、使用TableLayout实现计算器UI 1.新建一个空白项目布局 根据自己的需求输入其他信息 填写完成后,点击Finish即可 2. 设计UI界面 在res/layout文件夹中的XML文件中创建UI界面。在这个XML文件中,您可以使用TableLayout来设计计算器界面。 2.1 创建l…...
stack与queue的简单封装
前言: stack与queue即栈和队列,先进后出/先进先出的特性我们早已了然于心, 在学习数据结构时,我们利用c语言实现栈与队列,从结构体写起,利用数组或指针表示他们的数据成员,之后再一个个实现他们…...
ChatGPT使用技巧整理
目录 1. 让ChatGPT扮演专家角色2. 告诉ChatGPT你的身份3. 限制ChatGPT的回答长度4. 让ChatGPT一步步思考5. 明确你的要求和目的6. 提供充分的背景信息7. 始终结构化思考你的prompt1. 让ChatGPT扮演专家角色 当你们讨论的是市场营销问题时,你可以要求ChatGPT扮演一个具有20年从…...
机器学习笔记 - 维度诅咒的数学表达
1、点之间的距离 kNN分类器假设相似的点也可能有相同的标签。但是,在高维空间中,从概率分布中得出的点往往不会始终靠近在一起。 我们可以用一个简单的例子来说明这一点。 我们将在单位立方体内均匀地随机绘制点(如图所示),并研究该立方体内测试点的 k 个最近邻将占用多少…...
组合计数训练题解
CF40E 题目链接 点击打开链接 题目解法 首先,如果 n , m n,m n,m 一奇一偶,那么答案为 0 0 0 原因是从行和列的角度分析, − 1 -1 −1 个数的奇偶性不同 可以发现 k < max { n , m } k<\max\{n,m\} k<max{n,m} 的性质很微…...
P1095 [NOIP2007 普及组] 守望者的逃离
[NOIP2007 普及组] 守望者的逃离 - 洛谷 首先DP的套路就是先找状态 这题也找不出其他的状态了,只有时间一个 所以用f[i]表示时刻i能走多远 而仔细一想实际上决策只有跑、闪现、停三种决策 然而闪现的耗蓝要和跑步一同计算十分麻烦 于是把它们分开算࿱…...
Python函数绘图与高等代数互融实例(八):箱线图|误差棒图|堆积图
Python函数绘图与高等代数互融实例(一):正弦函数与余弦函数 Python函数绘图与高等代数互融实例(二):闪点函数 Python函数绘图与高等代数互融实例(三):设置X|Y轴|网格线 Python函数绘图与高等代数互融实例(四):设置X|Y轴参考线|参考区域 Python函数绘图与高等代数互融实例(五…...
联想y7000 y7000p 2018/2019 不插电源 不插充电器, 直接关机 ,电量一直89%/87%/86%,V0005如何解决?
这种问题,没有外力破坏的话,电池不可能突然出事。这种一般是联想的固件问题,有可能发生在系统更新,或者突然的不正常关机或长时间电池过热,原因我不是很清楚。 既然发生了,根据我收集的解决方法,…...
stm32与esp8266通信
esp8266 #include <ESP8266WiFi.h> #include <ESP8266HTTPClient.h>// 测试HTTP请求用的URL // #define URL "http://162.14.107.118:8086/PC/modifyFoodPrice/0/6"// 测试HTTP请求用的URL // 设置wifi接入信息(请根据您的WiFi信息进行修改) const char…...
组合数 2.1 2.2
O(nlogn)预处理, O(1)查询 #include<bits/stdc.h> #define IOS ios::sync_with_stdio(0);cin.tie(0);cout.tie(0); #define endl \nusing namespace std;typedef pair<int, int> PII; typedef long long ll; typedef long double ld;const int N 1000…...
【数组的中心位置】python实现-附ChatGPT解析
1.题目 数组的中心位置 题目 给你一个整数数组 nums,请计算数组的中心位置。 数组中心位置是数组的一个下标,其左侧所有元素相乘的积等于右侧所有元素相乘的积。 数组第一个元素的左侧积为 1,最后一个元素的右侧积为 1。 如果数组有多个中心位置,应该返回最靠近左边的那一个…...
黑马JVM总结(二十三)
(1)字节码指令-init 方法体内有一些字节,对应着将来要由java虚拟机执行方法内的代码,构造方法里5个字节代码,main方法里有9个字节的代码 java虚拟机呢内部有一个解释器,这个解释器呢可以识别平台无关的字…...
AI人体行为分析:玩手机/打电话/摔倒/攀爬/扭打检测及TSINGSEE场景解决方案
一、AI人体行为分析技术概述及场景 人体姿态分析/行为分析/动作识别AI算法,是一种利用人工智能技术对人体行为进行检测、跟踪和分析的方法。通过计算机视觉、深度学习和模式识别等技术,可以实现对人体姿态、动作和行为的自动化识别与分析。 在场景应用…...
HI_NAS linux 记录
dev/root 100% 占用解决记录 通过下面的命令查看各文件夹 大小 sudo du --max-depth1 -h # 统计当前文件夹下各个文件夹的大小显示为M 最终发现Var/log 占用很大空间 发现下面两个 log 占用空间很大,直接 rm-rf 即可 HI NAS python3 记录 # 安装pip3 sudo apt u…...
计算机图形学中的几何光学
文章目录 前言一、图形学中的光学二、光照模型1、经验型(简单)2、物理型(复杂) 前言 在学习Shader光照之前了解一下计算机图形学 一、图形学中的光学 镜面反射的效果例子:物体表面高光 慢反射的效果的例子:…...
「UG/NX」BlockUI 选择小平面区域 Select Facet Region
✨博客主页何曾参静谧的博客📌文章专栏「UG/NX」BlockUI集合📚全部专栏「UG/NX」NX二次开发「UG/NX」BlockUI集合「VS」Visual Studio「QT」QT5程序设计「C/C」C/C程序设计「Win」Windows程序设计「DSA」数据结构与算法「File」数据文件格式 目录 控件说…...
【完全二叉树魔法:顺序结构实现堆的奇象】
本章重点 二叉树的顺序结构堆的概念及结构堆的实现堆的调整算法堆的创建堆排序TOP-K问题 1.二叉树的顺序结构 普通的二叉树是不适合用数组来存储的,因为可能会存在大量的空间浪费。而完全二叉树更适合使用顺序结构存储。现实中我们通常把堆(一种二叉树)使用顺序结构…...
Maven官方镜像仓库与阿里云云效Maven
一、Maven官方镜像仓库 download maven-3 右击复制链接地址,使用wget命令直接在linux中下载: wget 链接地址history 二、阿里云云效Maven 详情查看maven 配置指南 打开 maven 的配置文件( windows 机器一般在 maven 安装目录的 conf/…...
python系列教程215——列表解析与矩阵
朋友们,如需转载请标明出处:https://blog.csdn.net/jiangjunshow 声明:在人工智能技术教学期间,不少学生向我提一些python相关的问题,所以为了让同学们掌握更多扩展知识更好地理解AI技术,我让助理负责分享…...
Windows风扇控制终极指南:5分钟学会FanControl智能调校
Windows风扇控制终极指南:5分钟学会FanControl智能调校 【免费下载链接】FanControl.Releases This is the release repository for Fan Control, a highly customizable fan controlling software for Windows. 项目地址: https://gitcode.com/GitHub_Trending/f…...
Davinci vs. 其他开源BI工具(Superset/Metabase)实战对比:我们团队为什么最终选了它?
Davinci vs. 其他开源BI工具实战对比:技术选型的深度思考 在数据驱动决策的时代,企业级BI工具的选择直接影响着数据分析的效率和深度。当我们团队面临开源BI工具选型时,Davinci、Apache Superset和Metabase成为了主要候选对象。经过三个月的实…...
打卡信奥刷题(3245)用C++实现信奥题 P8563 Magenta Potion
P8563 Magenta Potion 题目描述 给定一个长为 nnn 的整数序列 aaa,其中所有数的绝对值均大于等于 222。有 qqq 次操作,格式如下: 1 i k\texttt{1 i k}1 i k,表示将 aia_iai 修改为 kkk。保证 $k $ 的绝对值大于等于 222。 2 l r…...
数据标注平台搭建:支持主动学习的智能标注工具
在软件测试领域,测试数据的质量直接决定了测试覆盖率和缺陷发现能力。随着AI驱动测试的兴起,高质量标注数据成为训练测试预言、缺陷预测模型、自动化测试脚本生成等智能测试工具的核心资产。然而,传统的人工标注方式效率低下、一致性差&#…...
避开C2000开发第一个坑:TMS320F28069的InitSysCtrl()函数里,为什么ADC时钟要开一下又关?
TMS320F28069开发揭秘:ADC时钟瞬启瞬闭背后的硬件校准逻辑 在TMS320F28069的InitSysCtrl()初始化函数中,有一段看似矛盾的代码操作:先启用ADC时钟,调用(*Device_cal)()函数后立即关闭。这个"开关ADC时钟"的瞬态操作绝非…...
通信行业硅转向:从专用ASIC到软件定义网络的架构演进
1. 项目概述:通信行业的硅转向 如果你在2016年前后关注过通信设备行业,尤其是那些做核心路由器、骨干网交换机的“大厂”,你大概能感受到一种山雨欲来的氛围。当时,一篇来自EE Times的报道,标题是“Silicon Shift Ahea…...
AI工程化实战:基于Python工具箱构建生产级AI服务
1. 项目概述:一个AI驱动的Python开发工具箱 最近在GitHub上看到一个挺有意思的项目,叫“antarys-ai/python”。光看名字,你可能会觉得这又是一个普通的Python库或者某个AI框架的封装。但当我深入进去,发现它的定位其实相当独特&am…...
从零到一:在Visual Studio中集成海康机器人工业相机SDK的完整指南
1. 环境准备:搭建开发基础 第一次接触工业相机开发时,我也被各种专业术语和配置步骤搞得头晕眼花。后来发现只要把环境搭建好,后面的开发就会顺利很多。咱们先从最基础的软件安装开始,就像盖房子要先打地基一样。 Visual Studio的…...
3步解锁PotPlayer字幕翻译:让外语视频不再难懂
3步解锁PotPlayer字幕翻译:让外语视频不再难懂 【免费下载链接】PotPlayer_Subtitle_Translate_Baidu PotPlayer 字幕在线翻译插件 - 百度平台 项目地址: https://gitcode.com/gh_mirrors/po/PotPlayer_Subtitle_Translate_Baidu 还在为看不懂的外语视频字幕…...
AD覆铜疑难杂症:从Modified Polygon到“引脚粘连”的排查与设计规避
1. Modified Polygon报错:现象与诊断 最近在做一个六层板设计时,遇到了典型的Modified Polygon报错。当时正在对电源层进行覆铜操作,点击"铺铜"按钮后,软件突然弹出一个红色警告框,显示"Modified Polyg…...
