当前位置: 首页 > news >正文

kafka消费者多线程开发

目录

前言

kafka consumer 设计原理

多线程的方案 

参考资料


前言

目前,计算机的硬件条件已经大大改善,即使是在普通的笔记本电脑上,多核都已经是标配了,更不用说专业的服务器了。如果跑在强劲服务器机器上的应用程序依然是单线程架构,那实在是有点暴殄天物了。不过,Kafka Java Consumer 就是单线程的设计,你是不是感到很惊讶。所以,探究它的多线程消费方案,就显得非常必要了。

kafka consumer 设计原理

 从 Kafka 0.10.1.0 版本开始,KafkaConsumer 就变为了双线程的设计,即用户主线程和心跳线程。

 所谓用户主线程,就是你启动 Consumer 应用程序 main 方法的那个线程,而新引入的心跳线程(Heartbeat Thread)只负责定期给对应的 Broker 机器发送心跳请求,以标识消费者应用的存活性(liveness)。引入这个心跳线程还有一个目的,那就是期望它能将心跳频率与主线程调用 KafkaConsumer.poll 方法的频率分开,从而解耦真实的消息处理逻辑与消费者组成员存活性管理。

 单线程的设计能够简化 Consumer 端的设计。Consumer 获取到消息后,处理消息的逻辑是否采用多线程,完全由你决定。这样,你就拥有了把消息处理的多线程管理策略从 Consumer 端代码中剥离的权利。

多线程的方案 

 我们要明确的是,KafkaConsumer 类不是线程安全的 (thread-safe)。所有的网络 I/O 处理都是发生在用户主线程中,因此,你在使用过程中必须要确保线程安全。简单来说,不能在多个线程中共享同一个 KafkaConsumer 实例,否则程序会抛出 ConcurrentModificationException 异常。

由于kafka consumer不是线程安全,我么你能制定两种多线程的方案。

1.消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程。如下图所示:

 2.消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。获取消息的线程可以是一个,也可以是多个,每个线程维护专属的 KafkaConsumer 实例,处理消息则交由特定的线程池来做,从而实现消息获取与消息处理的真正解耦。具体架构如下图所示:

我们来打个比方。比如一个完整的消费者应用程序要做的事情是 1、2、3、4、5,那么方案 1 的思路是粗粒度化的工作划分,也就是说方案 1 会创建多个线程,每个线程完整地执行 1、2、3、4、5,以实现并行处理的目标,它不会进一步分割具体的子任务;而方案 2 则更细粒度化,它会将 1、2 分割出来,用单线程(也可以是多线程)来做,对于 3、4、5,则用另外的多个线程来做。 

这两种方案的比较如下:

实现代码示例如下:

方案一的代码:

public class KafkaConsumerRunner implements Runnable {private final AtomicBoolean closed = new AtomicBoolean(false);private final KafkaConsumer consumer;public void run() {try {consumer.subscribe(Arrays.asList("topic"));while (!closed.get()) {ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));//  执行消息处理逻辑}} catch (WakeupException e) {// Ignore exception if closingif (!closed.get()) throw e;} finally {consumer.close();}}// Shutdown hook which can be called from a separate threadpublic void shutdown() {closed.set(true);consumer.wakeup();}

这段代码创建了一个 Runnable 类,表示执行消费获取和消费处理的逻辑。每个 KafkaConsumerRunner 类都会创建一个专属的 KafkaConsumer 实例。在实际应用中,你可以创建多个 KafkaConsumerRunner 实例,并依次执行启动它们,以实现方案 1 的多线程架构

方案2 的代码:

private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
...private int workerNum = ...;
executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());...
while (true)  {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));for (final ConsumerRecord record : records) {executors.submit(new Worker(record));}
}
..

参考资料

20 | 多线程开发消费者实例-极客时间

相关文章:

kafka消费者多线程开发

目录 前言 kafka consumer 设计原理 多线程的方案 参考资料 前言 目前&#xff0c;计算机的硬件条件已经大大改善&#xff0c;即使是在普通的笔记本电脑上&#xff0c;多核都已经是标配了&#xff0c;更不用说专业的服务器了。如果跑在强劲服务器机器上的应用程序依然是单…...

布局设计和实现:计算器UI【TableLayout、GridLayout】

一、使用TableLayout实现计算器UI 1.新建一个空白项目布局 根据自己的需求输入其他信息 填写完成后&#xff0c;点击Finish即可 2. 设计UI界面 在res/layout文件夹中的XML文件中创建UI界面。在这个XML文件中&#xff0c;您可以使用TableLayout来设计计算器界面。 2.1 创建l…...

stack与queue的简单封装

前言&#xff1a; stack与queue即栈和队列&#xff0c;先进后出/先进先出的特性我们早已了然于心&#xff0c; 在学习数据结构时&#xff0c;我们利用c语言实现栈与队列&#xff0c;从结构体写起&#xff0c;利用数组或指针表示他们的数据成员&#xff0c;之后再一个个实现他们…...

ChatGPT使用技巧整理

目录 1. 让ChatGPT扮演专家角色2. 告诉ChatGPT你的身份3. 限制ChatGPT的回答长度4. 让ChatGPT一步步思考5. 明确你的要求和目的6. 提供充分的背景信息7. 始终结构化思考你的prompt1. 让ChatGPT扮演专家角色 当你们讨论的是市场营销问题时,你可以要求ChatGPT扮演一个具有20年从…...

机器学习笔记 - 维度诅咒的数学表达

1、点之间的距离 kNN分类器假设相似的点也可能有相同的标签。但是,在高维空间中,从概率分布中得出的点往往不会始终靠近在一起。 我们可以用一个简单的例子来说明这一点。 我们将在单位立方体内均匀地随机绘制点(如图所示),并研究该立方体内测试点的 k 个最近邻将占用多少…...

组合计数训练题解

CF40E 题目链接 点击打开链接 题目解法 首先&#xff0c;如果 n , m n,m n,m 一奇一偶&#xff0c;那么答案为 0 0 0 原因是从行和列的角度分析&#xff0c; − 1 -1 −1 个数的奇偶性不同 可以发现 k < max ⁡ { n , m } k<\max\{n,m\} k<max{n,m} 的性质很微…...

P1095 [NOIP2007 普及组] 守望者的逃离

[NOIP2007 普及组] 守望者的逃离 - 洛谷 首先DP的套路就是先找状态 这题也找不出其他的状态了&#xff0c;只有时间一个 所以用f[i]表示时刻i能走多远 而仔细一想实际上决策只有跑、闪现、停三种决策 然而闪现的耗蓝要和跑步一同计算十分麻烦 于是把它们分开算&#xff1…...

Python函数绘图与高等代数互融实例(八):箱线图|误差棒图|堆积图

Python函数绘图与高等代数互融实例(一):正弦函数与余弦函数 Python函数绘图与高等代数互融实例(二):闪点函数 Python函数绘图与高等代数互融实例(三):设置X|Y轴|网格线 Python函数绘图与高等代数互融实例(四):设置X|Y轴参考线|参考区域 Python函数绘图与高等代数互融实例(五…...

联想y7000 y7000p 2018/2019 不插电源 不插充电器, 直接关机 ,电量一直89%/87%/86%,V0005如何解决?

这种问题&#xff0c;没有外力破坏的话&#xff0c;电池不可能突然出事。这种一般是联想的固件问题&#xff0c;有可能发生在系统更新&#xff0c;或者突然的不正常关机或长时间电池过热&#xff0c;原因我不是很清楚。 既然发生了&#xff0c;根据我收集的解决方法&#xff0c…...

stm32与esp8266通信

esp8266 #include <ESP8266WiFi.h> #include <ESP8266HTTPClient.h>// 测试HTTP请求用的URL // #define URL "http://162.14.107.118:8086/PC/modifyFoodPrice/0/6"// 测试HTTP请求用的URL // 设置wifi接入信息(请根据您的WiFi信息进行修改) const char…...

组合数 2.1 2.2

O(nlogn)预处理&#xff0c; O(1)查询 #include<bits/stdc.h> #define IOS ios::sync_with_stdio(0);cin.tie(0);cout.tie(0); #define endl \nusing namespace std;typedef pair<int, int> PII; typedef long long ll; typedef long double ld;const int N 1000…...

【数组的中心位置】python实现-附ChatGPT解析

1.题目 数组的中心位置 题目 给你一个整数数组 nums,请计算数组的中心位置。 数组中心位置是数组的一个下标,其左侧所有元素相乘的积等于右侧所有元素相乘的积。 数组第一个元素的左侧积为 1,最后一个元素的右侧积为 1。 如果数组有多个中心位置,应该返回最靠近左边的那一个…...

黑马JVM总结(二十三)

&#xff08;1&#xff09;字节码指令-init 方法体内有一些字节&#xff0c;对应着将来要由java虚拟机执行方法内的代码&#xff0c;构造方法里5个字节代码&#xff0c;main方法里有9个字节的代码 java虚拟机呢内部有一个解释器&#xff0c;这个解释器呢可以识别平台无关的字…...

AI人体行为分析:玩手机/打电话/摔倒/攀爬/扭打检测及TSINGSEE场景解决方案

一、AI人体行为分析技术概述及场景 人体姿态分析/行为分析/动作识别AI算法&#xff0c;是一种利用人工智能技术对人体行为进行检测、跟踪和分析的方法。通过计算机视觉、深度学习和模式识别等技术&#xff0c;可以实现对人体姿态、动作和行为的自动化识别与分析。 在场景应用…...

HI_NAS linux 记录

dev/root 100% 占用解决记录 通过下面的命令查看各文件夹 大小 sudo du --max-depth1 -h # 统计当前文件夹下各个文件夹的大小显示为M 最终发现Var/log 占用很大空间 发现下面两个 log 占用空间很大&#xff0c;直接 rm-rf 即可 HI NAS python3 记录 # 安装pip3 sudo apt u…...

计算机图形学中的几何光学

文章目录 前言一、图形学中的光学二、光照模型1、经验型&#xff08;简单&#xff09;2、物理型&#xff08;复杂&#xff09; 前言 在学习Shader光照之前了解一下计算机图形学 一、图形学中的光学 镜面反射的效果例子&#xff1a;物体表面高光 慢反射的效果的例子&#xff1a…...

「UG/NX」BlockUI 选择小平面区域 Select Facet Region

✨博客主页何曾参静谧的博客&#x1f4cc;文章专栏「UG/NX」BlockUI集合&#x1f4da;全部专栏「UG/NX」NX二次开发「UG/NX」BlockUI集合「VS」Visual Studio「QT」QT5程序设计「C/C」C/C程序设计「Win」Windows程序设计「DSA」数据结构与算法「File」数据文件格式 目录 控件说…...

【完全二叉树魔法:顺序结构实现堆的奇象】

本章重点 二叉树的顺序结构堆的概念及结构堆的实现堆的调整算法堆的创建堆排序TOP-K问题 1.二叉树的顺序结构 普通的二叉树是不适合用数组来存储的&#xff0c;因为可能会存在大量的空间浪费。而完全二叉树更适合使用顺序结构存储。现实中我们通常把堆(一种二叉树)使用顺序结构…...

Maven官方镜像仓库与阿里云云效Maven

一、Maven官方镜像仓库 download maven-3 右击复制链接地址&#xff0c;使用wget命令直接在linux中下载&#xff1a; wget 链接地址history 二、阿里云云效Maven 详情查看maven 配置指南 打开 maven 的配置文件&#xff08; windows 机器一般在 maven 安装目录的 conf/…...

python系列教程215——列表解析与矩阵

朋友们&#xff0c;如需转载请标明出处&#xff1a;https://blog.csdn.net/jiangjunshow 声明&#xff1a;在人工智能技术教学期间&#xff0c;不少学生向我提一些python相关的问题&#xff0c;所以为了让同学们掌握更多扩展知识更好地理解AI技术&#xff0c;我让助理负责分享…...

基于大模型的 UI 自动化系统

基于大模型的 UI 自动化系统 下面是一个完整的 Python 系统,利用大模型实现智能 UI 自动化,结合计算机视觉和自然语言处理技术,实现"看屏操作"的能力。 系统架构设计 #mermaid-svg-2gn2GRvh5WCP2ktF {font-family:"trebuchet ms",verdana,arial,sans-…...

调用支付宝接口响应40004 SYSTEM_ERROR问题排查

在对接支付宝API的时候&#xff0c;遇到了一些问题&#xff0c;记录一下排查过程。 Body:{"datadigital_fincloud_generalsaas_face_certify_initialize_response":{"msg":"Business Failed","code":"40004","sub_msg…...

云计算——弹性云计算器(ECS)

弹性云服务器&#xff1a;ECS 概述 云计算重构了ICT系统&#xff0c;云计算平台厂商推出使得厂家能够主要关注应用管理而非平台管理的云平台&#xff0c;包含如下主要概念。 ECS&#xff08;Elastic Cloud Server&#xff09;&#xff1a;即弹性云服务器&#xff0c;是云计算…...

【OSG学习笔记】Day 18: 碰撞检测与物理交互

物理引擎&#xff08;Physics Engine&#xff09; 物理引擎 是一种通过计算机模拟物理规律&#xff08;如力学、碰撞、重力、流体动力学等&#xff09;的软件工具或库。 它的核心目标是在虚拟环境中逼真地模拟物体的运动和交互&#xff0c;广泛应用于 游戏开发、动画制作、虚…...

剑指offer20_链表中环的入口节点

链表中环的入口节点 给定一个链表&#xff0c;若其中包含环&#xff0c;则输出环的入口节点。 若其中不包含环&#xff0c;则输出null。 数据范围 节点 val 值取值范围 [ 1 , 1000 ] [1,1000] [1,1000]。 节点 val 值各不相同。 链表长度 [ 0 , 500 ] [0,500] [0,500]。 …...

苍穹外卖--缓存菜品

1.问题说明 用户端小程序展示的菜品数据都是通过查询数据库获得&#xff0c;如果用户端访问量比较大&#xff0c;数据库访问压力随之增大 2.实现思路 通过Redis来缓存菜品数据&#xff0c;减少数据库查询操作。 缓存逻辑分析&#xff1a; ①每个分类下的菜品保持一份缓存数据…...

Linux --进程控制

本文从以下五个方面来初步认识进程控制&#xff1a; 目录 进程创建 进程终止 进程等待 进程替换 模拟实现一个微型shell 进程创建 在Linux系统中我们可以在一个进程使用系统调用fork()来创建子进程&#xff0c;创建出来的进程就是子进程&#xff0c;原来的进程为父进程。…...

安卓基础(aar)

重新设置java21的环境&#xff0c;临时设置 $env:JAVA_HOME "D:\Android Studio\jbr" 查看当前环境变量 JAVA_HOME 的值 echo $env:JAVA_HOME 构建ARR文件 ./gradlew :private-lib:assembleRelease 目录是这样的&#xff1a; MyApp/ ├── app/ …...

浪潮交换机配置track检测实现高速公路收费网络主备切换NQA

浪潮交换机track配置 项目背景高速网络拓扑网络情况分析通信线路收费网络路由 收费汇聚交换机相应配置收费汇聚track配置 项目背景 在实施省内一条高速公路时遇到的需求&#xff0c;本次涉及的主要是收费汇聚交换机的配置&#xff0c;浪潮网络设备在高速项目很少&#xff0c;通…...

【Go语言基础【13】】函数、闭包、方法

文章目录 零、概述一、函数基础1、函数基础概念2、参数传递机制3、返回值特性3.1. 多返回值3.2. 命名返回值3.3. 错误处理 二、函数类型与高阶函数1. 函数类型定义2. 高阶函数&#xff08;函数作为参数、返回值&#xff09; 三、匿名函数与闭包1. 匿名函数&#xff08;Lambda函…...