kafka消费者多线程开发
目录
前言
kafka consumer 设计原理
多线程的方案
参考资料
前言
目前,计算机的硬件条件已经大大改善,即使是在普通的笔记本电脑上,多核都已经是标配了,更不用说专业的服务器了。如果跑在强劲服务器机器上的应用程序依然是单线程架构,那实在是有点暴殄天物了。不过,Kafka Java Consumer 就是单线程的设计,你是不是感到很惊讶。所以,探究它的多线程消费方案,就显得非常必要了。
kafka consumer 设计原理
从 Kafka 0.10.1.0 版本开始,KafkaConsumer 就变为了双线程的设计,即用户主线程和心跳线程。
所谓用户主线程,就是你启动 Consumer 应用程序 main 方法的那个线程,而新引入的心跳线程(Heartbeat Thread)只负责定期给对应的 Broker 机器发送心跳请求,以标识消费者应用的存活性(liveness)。引入这个心跳线程还有一个目的,那就是期望它能将心跳频率与主线程调用 KafkaConsumer.poll 方法的频率分开,从而解耦真实的消息处理逻辑与消费者组成员存活性管理。
单线程的设计能够简化 Consumer 端的设计。Consumer 获取到消息后,处理消息的逻辑是否采用多线程,完全由你决定。这样,你就拥有了把消息处理的多线程管理策略从 Consumer 端代码中剥离的权利。
多线程的方案
我们要明确的是,KafkaConsumer 类不是线程安全的 (thread-safe)。所有的网络 I/O 处理都是发生在用户主线程中,因此,你在使用过程中必须要确保线程安全。简单来说,不能在多个线程中共享同一个 KafkaConsumer 实例,否则程序会抛出 ConcurrentModificationException 异常。
由于kafka consumer不是线程安全,我么你能制定两种多线程的方案。
1.消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程。如下图所示:
2.消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。获取消息的线程可以是一个,也可以是多个,每个线程维护专属的 KafkaConsumer 实例,处理消息则交由特定的线程池来做,从而实现消息获取与消息处理的真正解耦。具体架构如下图所示:
我们来打个比方。比如一个完整的消费者应用程序要做的事情是 1、2、3、4、5,那么方案 1 的思路是粗粒度化的工作划分,也就是说方案 1 会创建多个线程,每个线程完整地执行 1、2、3、4、5,以实现并行处理的目标,它不会进一步分割具体的子任务;而方案 2 则更细粒度化,它会将 1、2 分割出来,用单线程(也可以是多线程)来做,对于 3、4、5,则用另外的多个线程来做。
这两种方案的比较如下:
实现代码示例如下:
方案一的代码:
public class KafkaConsumerRunner implements Runnable {private final AtomicBoolean closed = new AtomicBoolean(false);private final KafkaConsumer consumer;public void run() {try {consumer.subscribe(Arrays.asList("topic"));while (!closed.get()) {ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));// 执行消息处理逻辑}} catch (WakeupException e) {// Ignore exception if closingif (!closed.get()) throw e;} finally {consumer.close();}}// Shutdown hook which can be called from a separate threadpublic void shutdown() {closed.set(true);consumer.wakeup();}
这段代码创建了一个 Runnable 类,表示执行消费获取和消费处理的逻辑。每个 KafkaConsumerRunner 类都会创建一个专属的 KafkaConsumer 实例。在实际应用中,你可以创建多个 KafkaConsumerRunner 实例,并依次执行启动它们,以实现方案 1 的多线程架构
方案2 的代码:
private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
...private int workerNum = ...;
executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());...
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));for (final ConsumerRecord record : records) {executors.submit(new Worker(record));}
}
..
参考资料
20 | 多线程开发消费者实例-极客时间
相关文章:

kafka消费者多线程开发
目录 前言 kafka consumer 设计原理 多线程的方案 参考资料 前言 目前,计算机的硬件条件已经大大改善,即使是在普通的笔记本电脑上,多核都已经是标配了,更不用说专业的服务器了。如果跑在强劲服务器机器上的应用程序依然是单…...

布局设计和实现:计算器UI【TableLayout、GridLayout】
一、使用TableLayout实现计算器UI 1.新建一个空白项目布局 根据自己的需求输入其他信息 填写完成后,点击Finish即可 2. 设计UI界面 在res/layout文件夹中的XML文件中创建UI界面。在这个XML文件中,您可以使用TableLayout来设计计算器界面。 2.1 创建l…...

stack与queue的简单封装
前言: stack与queue即栈和队列,先进后出/先进先出的特性我们早已了然于心, 在学习数据结构时,我们利用c语言实现栈与队列,从结构体写起,利用数组或指针表示他们的数据成员,之后再一个个实现他们…...
ChatGPT使用技巧整理
目录 1. 让ChatGPT扮演专家角色2. 告诉ChatGPT你的身份3. 限制ChatGPT的回答长度4. 让ChatGPT一步步思考5. 明确你的要求和目的6. 提供充分的背景信息7. 始终结构化思考你的prompt1. 让ChatGPT扮演专家角色 当你们讨论的是市场营销问题时,你可以要求ChatGPT扮演一个具有20年从…...
机器学习笔记 - 维度诅咒的数学表达
1、点之间的距离 kNN分类器假设相似的点也可能有相同的标签。但是,在高维空间中,从概率分布中得出的点往往不会始终靠近在一起。 我们可以用一个简单的例子来说明这一点。 我们将在单位立方体内均匀地随机绘制点(如图所示),并研究该立方体内测试点的 k 个最近邻将占用多少…...
组合计数训练题解
CF40E 题目链接 点击打开链接 题目解法 首先,如果 n , m n,m n,m 一奇一偶,那么答案为 0 0 0 原因是从行和列的角度分析, − 1 -1 −1 个数的奇偶性不同 可以发现 k < max { n , m } k<\max\{n,m\} k<max{n,m} 的性质很微…...
P1095 [NOIP2007 普及组] 守望者的逃离
[NOIP2007 普及组] 守望者的逃离 - 洛谷 首先DP的套路就是先找状态 这题也找不出其他的状态了,只有时间一个 所以用f[i]表示时刻i能走多远 而仔细一想实际上决策只有跑、闪现、停三种决策 然而闪现的耗蓝要和跑步一同计算十分麻烦 于是把它们分开算࿱…...

Python函数绘图与高等代数互融实例(八):箱线图|误差棒图|堆积图
Python函数绘图与高等代数互融实例(一):正弦函数与余弦函数 Python函数绘图与高等代数互融实例(二):闪点函数 Python函数绘图与高等代数互融实例(三):设置X|Y轴|网格线 Python函数绘图与高等代数互融实例(四):设置X|Y轴参考线|参考区域 Python函数绘图与高等代数互融实例(五…...

联想y7000 y7000p 2018/2019 不插电源 不插充电器, 直接关机 ,电量一直89%/87%/86%,V0005如何解决?
这种问题,没有外力破坏的话,电池不可能突然出事。这种一般是联想的固件问题,有可能发生在系统更新,或者突然的不正常关机或长时间电池过热,原因我不是很清楚。 既然发生了,根据我收集的解决方法,…...
stm32与esp8266通信
esp8266 #include <ESP8266WiFi.h> #include <ESP8266HTTPClient.h>// 测试HTTP请求用的URL // #define URL "http://162.14.107.118:8086/PC/modifyFoodPrice/0/6"// 测试HTTP请求用的URL // 设置wifi接入信息(请根据您的WiFi信息进行修改) const char…...
组合数 2.1 2.2
O(nlogn)预处理, O(1)查询 #include<bits/stdc.h> #define IOS ios::sync_with_stdio(0);cin.tie(0);cout.tie(0); #define endl \nusing namespace std;typedef pair<int, int> PII; typedef long long ll; typedef long double ld;const int N 1000…...
【数组的中心位置】python实现-附ChatGPT解析
1.题目 数组的中心位置 题目 给你一个整数数组 nums,请计算数组的中心位置。 数组中心位置是数组的一个下标,其左侧所有元素相乘的积等于右侧所有元素相乘的积。 数组第一个元素的左侧积为 1,最后一个元素的右侧积为 1。 如果数组有多个中心位置,应该返回最靠近左边的那一个…...

黑马JVM总结(二十三)
(1)字节码指令-init 方法体内有一些字节,对应着将来要由java虚拟机执行方法内的代码,构造方法里5个字节代码,main方法里有9个字节的代码 java虚拟机呢内部有一个解释器,这个解释器呢可以识别平台无关的字…...

AI人体行为分析:玩手机/打电话/摔倒/攀爬/扭打检测及TSINGSEE场景解决方案
一、AI人体行为分析技术概述及场景 人体姿态分析/行为分析/动作识别AI算法,是一种利用人工智能技术对人体行为进行检测、跟踪和分析的方法。通过计算机视觉、深度学习和模式识别等技术,可以实现对人体姿态、动作和行为的自动化识别与分析。 在场景应用…...

HI_NAS linux 记录
dev/root 100% 占用解决记录 通过下面的命令查看各文件夹 大小 sudo du --max-depth1 -h # 统计当前文件夹下各个文件夹的大小显示为M 最终发现Var/log 占用很大空间 发现下面两个 log 占用空间很大,直接 rm-rf 即可 HI NAS python3 记录 # 安装pip3 sudo apt u…...

计算机图形学中的几何光学
文章目录 前言一、图形学中的光学二、光照模型1、经验型(简单)2、物理型(复杂) 前言 在学习Shader光照之前了解一下计算机图形学 一、图形学中的光学 镜面反射的效果例子:物体表面高光 慢反射的效果的例子:…...

「UG/NX」BlockUI 选择小平面区域 Select Facet Region
✨博客主页何曾参静谧的博客📌文章专栏「UG/NX」BlockUI集合📚全部专栏「UG/NX」NX二次开发「UG/NX」BlockUI集合「VS」Visual Studio「QT」QT5程序设计「C/C」C/C程序设计「Win」Windows程序设计「DSA」数据结构与算法「File」数据文件格式 目录 控件说…...

【完全二叉树魔法:顺序结构实现堆的奇象】
本章重点 二叉树的顺序结构堆的概念及结构堆的实现堆的调整算法堆的创建堆排序TOP-K问题 1.二叉树的顺序结构 普通的二叉树是不适合用数组来存储的,因为可能会存在大量的空间浪费。而完全二叉树更适合使用顺序结构存储。现实中我们通常把堆(一种二叉树)使用顺序结构…...

Maven官方镜像仓库与阿里云云效Maven
一、Maven官方镜像仓库 download maven-3 右击复制链接地址,使用wget命令直接在linux中下载: wget 链接地址history 二、阿里云云效Maven 详情查看maven 配置指南 打开 maven 的配置文件( windows 机器一般在 maven 安装目录的 conf/…...
python系列教程215——列表解析与矩阵
朋友们,如需转载请标明出处:https://blog.csdn.net/jiangjunshow 声明:在人工智能技术教学期间,不少学生向我提一些python相关的问题,所以为了让同学们掌握更多扩展知识更好地理解AI技术,我让助理负责分享…...

第19节 Node.js Express 框架
Express 是一个为Node.js设计的web开发框架,它基于nodejs平台。 Express 简介 Express是一个简洁而灵活的node.js Web应用框架, 提供了一系列强大特性帮助你创建各种Web应用,和丰富的HTTP工具。 使用Express可以快速地搭建一个完整功能的网站。 Expre…...
基于服务器使用 apt 安装、配置 Nginx
🧾 一、查看可安装的 Nginx 版本 首先,你可以运行以下命令查看可用版本: apt-cache madison nginx-core输出示例: nginx-core | 1.18.0-6ubuntu14.6 | http://archive.ubuntu.com/ubuntu focal-updates/main amd64 Packages ng…...

【JVM】- 内存结构
引言 JVM:Java Virtual Machine 定义:Java虚拟机,Java二进制字节码的运行环境好处: 一次编写,到处运行自动内存管理,垃圾回收的功能数组下标越界检查(会抛异常,不会覆盖到其他代码…...

大数据零基础学习day1之环境准备和大数据初步理解
学习大数据会使用到多台Linux服务器。 一、环境准备 1、VMware 基于VMware构建Linux虚拟机 是大数据从业者或者IT从业者的必备技能之一也是成本低廉的方案 所以VMware虚拟机方案是必须要学习的。 (1)设置网关 打开VMware虚拟机,点击编辑…...
【算法训练营Day07】字符串part1
文章目录 反转字符串反转字符串II替换数字 反转字符串 题目链接:344. 反转字符串 双指针法,两个指针的元素直接调转即可 class Solution {public void reverseString(char[] s) {int head 0;int end s.length - 1;while(head < end) {char temp …...
css的定位(position)详解:相对定位 绝对定位 固定定位
在 CSS 中,元素的定位通过 position 属性控制,共有 5 种定位模式:static(静态定位)、relative(相对定位)、absolute(绝对定位)、fixed(固定定位)和…...

html css js网页制作成品——HTML+CSS榴莲商城网页设计(4页)附源码
目录 一、👨🎓网站题目 二、✍️网站描述 三、📚网站介绍 四、🌐网站效果 五、🪓 代码实现 🧱HTML 六、🥇 如何让学习不再盲目 七、🎁更多干货 一、👨…...
Java + Spring Boot + Mybatis 实现批量插入
在 Java 中使用 Spring Boot 和 MyBatis 实现批量插入可以通过以下步骤完成。这里提供两种常用方法:使用 MyBatis 的 <foreach> 标签和批处理模式(ExecutorType.BATCH)。 方法一:使用 XML 的 <foreach> 标签ÿ…...

Linux 下 DMA 内存映射浅析
序 系统 I/O 设备驱动程序通常调用其特定子系统的接口为 DMA 分配内存,但最终会调到 DMA 子系统的dma_alloc_coherent()/dma_alloc_attrs() 等接口。 关于 dma_alloc_coherent 接口详细的代码讲解、调用流程,可以参考这篇文章,我觉得写的非常…...

macOS 终端智能代理检测
🧠 终端智能代理检测:自动判断是否需要设置代理访问 GitHub 在开发中,使用 GitHub 是非常常见的需求。但有时候我们会发现某些命令失败、插件无法更新,例如: fatal: unable to access https://github.com/ohmyzsh/oh…...