当前位置: 首页 > news >正文

kafka消费者多线程开发

目录

前言

kafka consumer 设计原理

多线程的方案 

参考资料


前言

目前,计算机的硬件条件已经大大改善,即使是在普通的笔记本电脑上,多核都已经是标配了,更不用说专业的服务器了。如果跑在强劲服务器机器上的应用程序依然是单线程架构,那实在是有点暴殄天物了。不过,Kafka Java Consumer 就是单线程的设计,你是不是感到很惊讶。所以,探究它的多线程消费方案,就显得非常必要了。

kafka consumer 设计原理

 从 Kafka 0.10.1.0 版本开始,KafkaConsumer 就变为了双线程的设计,即用户主线程和心跳线程。

 所谓用户主线程,就是你启动 Consumer 应用程序 main 方法的那个线程,而新引入的心跳线程(Heartbeat Thread)只负责定期给对应的 Broker 机器发送心跳请求,以标识消费者应用的存活性(liveness)。引入这个心跳线程还有一个目的,那就是期望它能将心跳频率与主线程调用 KafkaConsumer.poll 方法的频率分开,从而解耦真实的消息处理逻辑与消费者组成员存活性管理。

 单线程的设计能够简化 Consumer 端的设计。Consumer 获取到消息后,处理消息的逻辑是否采用多线程,完全由你决定。这样,你就拥有了把消息处理的多线程管理策略从 Consumer 端代码中剥离的权利。

多线程的方案 

 我们要明确的是,KafkaConsumer 类不是线程安全的 (thread-safe)。所有的网络 I/O 处理都是发生在用户主线程中,因此,你在使用过程中必须要确保线程安全。简单来说,不能在多个线程中共享同一个 KafkaConsumer 实例,否则程序会抛出 ConcurrentModificationException 异常。

由于kafka consumer不是线程安全,我么你能制定两种多线程的方案。

1.消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程。如下图所示:

 2.消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。获取消息的线程可以是一个,也可以是多个,每个线程维护专属的 KafkaConsumer 实例,处理消息则交由特定的线程池来做,从而实现消息获取与消息处理的真正解耦。具体架构如下图所示:

我们来打个比方。比如一个完整的消费者应用程序要做的事情是 1、2、3、4、5,那么方案 1 的思路是粗粒度化的工作划分,也就是说方案 1 会创建多个线程,每个线程完整地执行 1、2、3、4、5,以实现并行处理的目标,它不会进一步分割具体的子任务;而方案 2 则更细粒度化,它会将 1、2 分割出来,用单线程(也可以是多线程)来做,对于 3、4、5,则用另外的多个线程来做。 

这两种方案的比较如下:

实现代码示例如下:

方案一的代码:

public class KafkaConsumerRunner implements Runnable {private final AtomicBoolean closed = new AtomicBoolean(false);private final KafkaConsumer consumer;public void run() {try {consumer.subscribe(Arrays.asList("topic"));while (!closed.get()) {ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));//  执行消息处理逻辑}} catch (WakeupException e) {// Ignore exception if closingif (!closed.get()) throw e;} finally {consumer.close();}}// Shutdown hook which can be called from a separate threadpublic void shutdown() {closed.set(true);consumer.wakeup();}

这段代码创建了一个 Runnable 类,表示执行消费获取和消费处理的逻辑。每个 KafkaConsumerRunner 类都会创建一个专属的 KafkaConsumer 实例。在实际应用中,你可以创建多个 KafkaConsumerRunner 实例,并依次执行启动它们,以实现方案 1 的多线程架构

方案2 的代码:

private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
...private int workerNum = ...;
executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());...
while (true)  {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));for (final ConsumerRecord record : records) {executors.submit(new Worker(record));}
}
..

参考资料

20 | 多线程开发消费者实例-极客时间

相关文章:

kafka消费者多线程开发

目录 前言 kafka consumer 设计原理 多线程的方案 参考资料 前言 目前&#xff0c;计算机的硬件条件已经大大改善&#xff0c;即使是在普通的笔记本电脑上&#xff0c;多核都已经是标配了&#xff0c;更不用说专业的服务器了。如果跑在强劲服务器机器上的应用程序依然是单…...

布局设计和实现:计算器UI【TableLayout、GridLayout】

一、使用TableLayout实现计算器UI 1.新建一个空白项目布局 根据自己的需求输入其他信息 填写完成后&#xff0c;点击Finish即可 2. 设计UI界面 在res/layout文件夹中的XML文件中创建UI界面。在这个XML文件中&#xff0c;您可以使用TableLayout来设计计算器界面。 2.1 创建l…...

stack与queue的简单封装

前言&#xff1a; stack与queue即栈和队列&#xff0c;先进后出/先进先出的特性我们早已了然于心&#xff0c; 在学习数据结构时&#xff0c;我们利用c语言实现栈与队列&#xff0c;从结构体写起&#xff0c;利用数组或指针表示他们的数据成员&#xff0c;之后再一个个实现他们…...

ChatGPT使用技巧整理

目录 1. 让ChatGPT扮演专家角色2. 告诉ChatGPT你的身份3. 限制ChatGPT的回答长度4. 让ChatGPT一步步思考5. 明确你的要求和目的6. 提供充分的背景信息7. 始终结构化思考你的prompt1. 让ChatGPT扮演专家角色 当你们讨论的是市场营销问题时,你可以要求ChatGPT扮演一个具有20年从…...

机器学习笔记 - 维度诅咒的数学表达

1、点之间的距离 kNN分类器假设相似的点也可能有相同的标签。但是,在高维空间中,从概率分布中得出的点往往不会始终靠近在一起。 我们可以用一个简单的例子来说明这一点。 我们将在单位立方体内均匀地随机绘制点(如图所示),并研究该立方体内测试点的 k 个最近邻将占用多少…...

组合计数训练题解

CF40E 题目链接 点击打开链接 题目解法 首先&#xff0c;如果 n , m n,m n,m 一奇一偶&#xff0c;那么答案为 0 0 0 原因是从行和列的角度分析&#xff0c; − 1 -1 −1 个数的奇偶性不同 可以发现 k < max ⁡ { n , m } k<\max\{n,m\} k<max{n,m} 的性质很微…...

P1095 [NOIP2007 普及组] 守望者的逃离

[NOIP2007 普及组] 守望者的逃离 - 洛谷 首先DP的套路就是先找状态 这题也找不出其他的状态了&#xff0c;只有时间一个 所以用f[i]表示时刻i能走多远 而仔细一想实际上决策只有跑、闪现、停三种决策 然而闪现的耗蓝要和跑步一同计算十分麻烦 于是把它们分开算&#xff1…...

Python函数绘图与高等代数互融实例(八):箱线图|误差棒图|堆积图

Python函数绘图与高等代数互融实例(一):正弦函数与余弦函数 Python函数绘图与高等代数互融实例(二):闪点函数 Python函数绘图与高等代数互融实例(三):设置X|Y轴|网格线 Python函数绘图与高等代数互融实例(四):设置X|Y轴参考线|参考区域 Python函数绘图与高等代数互融实例(五…...

联想y7000 y7000p 2018/2019 不插电源 不插充电器, 直接关机 ,电量一直89%/87%/86%,V0005如何解决?

这种问题&#xff0c;没有外力破坏的话&#xff0c;电池不可能突然出事。这种一般是联想的固件问题&#xff0c;有可能发生在系统更新&#xff0c;或者突然的不正常关机或长时间电池过热&#xff0c;原因我不是很清楚。 既然发生了&#xff0c;根据我收集的解决方法&#xff0c…...

stm32与esp8266通信

esp8266 #include <ESP8266WiFi.h> #include <ESP8266HTTPClient.h>// 测试HTTP请求用的URL // #define URL "http://162.14.107.118:8086/PC/modifyFoodPrice/0/6"// 测试HTTP请求用的URL // 设置wifi接入信息(请根据您的WiFi信息进行修改) const char…...

组合数 2.1 2.2

O(nlogn)预处理&#xff0c; O(1)查询 #include<bits/stdc.h> #define IOS ios::sync_with_stdio(0);cin.tie(0);cout.tie(0); #define endl \nusing namespace std;typedef pair<int, int> PII; typedef long long ll; typedef long double ld;const int N 1000…...

【数组的中心位置】python实现-附ChatGPT解析

1.题目 数组的中心位置 题目 给你一个整数数组 nums,请计算数组的中心位置。 数组中心位置是数组的一个下标,其左侧所有元素相乘的积等于右侧所有元素相乘的积。 数组第一个元素的左侧积为 1,最后一个元素的右侧积为 1。 如果数组有多个中心位置,应该返回最靠近左边的那一个…...

黑马JVM总结(二十三)

&#xff08;1&#xff09;字节码指令-init 方法体内有一些字节&#xff0c;对应着将来要由java虚拟机执行方法内的代码&#xff0c;构造方法里5个字节代码&#xff0c;main方法里有9个字节的代码 java虚拟机呢内部有一个解释器&#xff0c;这个解释器呢可以识别平台无关的字…...

AI人体行为分析:玩手机/打电话/摔倒/攀爬/扭打检测及TSINGSEE场景解决方案

一、AI人体行为分析技术概述及场景 人体姿态分析/行为分析/动作识别AI算法&#xff0c;是一种利用人工智能技术对人体行为进行检测、跟踪和分析的方法。通过计算机视觉、深度学习和模式识别等技术&#xff0c;可以实现对人体姿态、动作和行为的自动化识别与分析。 在场景应用…...

HI_NAS linux 记录

dev/root 100% 占用解决记录 通过下面的命令查看各文件夹 大小 sudo du --max-depth1 -h # 统计当前文件夹下各个文件夹的大小显示为M 最终发现Var/log 占用很大空间 发现下面两个 log 占用空间很大&#xff0c;直接 rm-rf 即可 HI NAS python3 记录 # 安装pip3 sudo apt u…...

计算机图形学中的几何光学

文章目录 前言一、图形学中的光学二、光照模型1、经验型&#xff08;简单&#xff09;2、物理型&#xff08;复杂&#xff09; 前言 在学习Shader光照之前了解一下计算机图形学 一、图形学中的光学 镜面反射的效果例子&#xff1a;物体表面高光 慢反射的效果的例子&#xff1a…...

「UG/NX」BlockUI 选择小平面区域 Select Facet Region

✨博客主页何曾参静谧的博客&#x1f4cc;文章专栏「UG/NX」BlockUI集合&#x1f4da;全部专栏「UG/NX」NX二次开发「UG/NX」BlockUI集合「VS」Visual Studio「QT」QT5程序设计「C/C」C/C程序设计「Win」Windows程序设计「DSA」数据结构与算法「File」数据文件格式 目录 控件说…...

【完全二叉树魔法:顺序结构实现堆的奇象】

本章重点 二叉树的顺序结构堆的概念及结构堆的实现堆的调整算法堆的创建堆排序TOP-K问题 1.二叉树的顺序结构 普通的二叉树是不适合用数组来存储的&#xff0c;因为可能会存在大量的空间浪费。而完全二叉树更适合使用顺序结构存储。现实中我们通常把堆(一种二叉树)使用顺序结构…...

Maven官方镜像仓库与阿里云云效Maven

一、Maven官方镜像仓库 download maven-3 右击复制链接地址&#xff0c;使用wget命令直接在linux中下载&#xff1a; wget 链接地址history 二、阿里云云效Maven 详情查看maven 配置指南 打开 maven 的配置文件&#xff08; windows 机器一般在 maven 安装目录的 conf/…...

python系列教程215——列表解析与矩阵

朋友们&#xff0c;如需转载请标明出处&#xff1a;https://blog.csdn.net/jiangjunshow 声明&#xff1a;在人工智能技术教学期间&#xff0c;不少学生向我提一些python相关的问题&#xff0c;所以为了让同学们掌握更多扩展知识更好地理解AI技术&#xff0c;我让助理负责分享…...

微软PowerBI考试 PL300-选择 Power BI 模型框架【附练习数据】

微软PowerBI考试 PL300-选择 Power BI 模型框架 20 多年来&#xff0c;Microsoft 持续对企业商业智能 (BI) 进行大量投资。 Azure Analysis Services (AAS) 和 SQL Server Analysis Services (SSAS) 基于无数企业使用的成熟的 BI 数据建模技术。 同样的技术也是 Power BI 数据…...

在rocky linux 9.5上在线安装 docker

前面是指南&#xff0c;后面是日志 sudo dnf config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo sudo dnf install docker-ce docker-ce-cli containerd.io -y docker version sudo systemctl start docker sudo systemctl status docker …...

解锁数据库简洁之道:FastAPI与SQLModel实战指南

在构建现代Web应用程序时&#xff0c;与数据库的交互无疑是核心环节。虽然传统的数据库操作方式&#xff08;如直接编写SQL语句与psycopg2交互&#xff09;赋予了我们精细的控制权&#xff0c;但在面对日益复杂的业务逻辑和快速迭代的需求时&#xff0c;这种方式的开发效率和可…...

Nuxt.js 中的路由配置详解

Nuxt.js 通过其内置的路由系统简化了应用的路由配置&#xff0c;使得开发者可以轻松地管理页面导航和 URL 结构。路由配置主要涉及页面组件的组织、动态路由的设置以及路由元信息的配置。 自动路由生成 Nuxt.js 会根据 pages 目录下的文件结构自动生成路由配置。每个文件都会对…...

学习STC51单片机31(芯片为STC89C52RCRC)OLED显示屏1

每日一言 生活的美好&#xff0c;总是藏在那些你咬牙坚持的日子里。 硬件&#xff1a;OLED 以后要用到OLED的时候找到这个文件 OLED的设备地址 SSD1306"SSD" 是品牌缩写&#xff0c;"1306" 是产品编号。 驱动 OLED 屏幕的 IIC 总线数据传输格式 示意图 …...

Device Mapper 机制

Device Mapper 机制详解 Device Mapper&#xff08;简称 DM&#xff09;是 Linux 内核中的一套通用块设备映射框架&#xff0c;为 LVM、加密磁盘、RAID 等提供底层支持。本文将详细介绍 Device Mapper 的原理、实现、内核配置、常用工具、操作测试流程&#xff0c;并配以详细的…...

xmind转换为markdown

文章目录 解锁思维导图新姿势&#xff1a;将XMind转为结构化Markdown 一、认识Xmind结构二、核心转换流程详解1.解压XMind文件&#xff08;ZIP处理&#xff09;2.解析JSON数据结构3&#xff1a;递归转换树形结构4&#xff1a;Markdown层级生成逻辑 三、完整代码 解锁思维导图新…...

相关类相关的可视化图像总结

目录 一、散点图 二、气泡图 三、相关图 四、热力图 五、二维密度图 六、多模态二维密度图 七、雷达图 八、桑基图 九、总结 一、散点图 特点 通过点的位置展示两个连续变量之间的关系&#xff0c;可直观判断线性相关、非线性相关或无相关关系&#xff0c;点的分布密…...

React父子组件通信:Props怎么用?如何从父组件向子组件传递数据?

系列回顾&#xff1a; 在上一篇《React核心概念&#xff1a;State是什么&#xff1f;》中&#xff0c;我们学习了如何使用useState让一个组件拥有自己的内部数据&#xff08;State&#xff09;&#xff0c;并通过一个计数器案例&#xff0c;实现了组件的自我更新。这很棒&#…...

ABAP设计模式之---“Tell, Don’t Ask原则”

“Tell, Don’t Ask”是一种重要的面向对象编程设计原则&#xff0c;它强调的是对象之间如何有效地交流和协作。 1. 什么是 Tell, Don’t Ask 原则&#xff1f; 这个原则的核心思想是&#xff1a; “告诉一个对象该做什么&#xff0c;而不是询问一个对象的状态再对它作出决策。…...