当前位置: 首页 > article >正文

Kafka 多线程开发消费者实例

目前,计算机的硬件条件已经大大改善,即使是在普通的笔记本电脑上,多核都已经是标配了,更不用说专业的服务器了。如果跑在强劲服务器机器上的应用程序依然是单线程架构,那实在是有点暴殄天物了。不过,Kafka Java Consumer 就是单线程的设计,你是不是感到很惊讶。所以,探究它的多线程消费方案,就显得非常必要了。

Kafka Java Consumer 设计原理

在开始探究之前,我先简单阐述下 Kafka Java Consumer 为什么采用单线程的设计。了解了这一点,对我们后面制定多线程方案大有裨益。

谈到 Java Consumer API,最重要的当属它的入口类 KafkaConsumer 了。我们说 KafkaConsumer 是单线程的设计,严格来说这是不准确的。因为,从 Kafka 0.10.1.0 版本开始,KafkaConsumer 就变为了双线程的设计,即用户主线程和心跳线程

所谓用户主线程,就是你启动 Consumer 应用程序 main 方法的那个线程,而新引入的心跳线程(Heartbeat Thread)只负责定期给对应的 Broker 机器发送心跳请求,以标识消费者应用的存活性(liveness)。引入这个心跳线程还有一个目的,那就是期望它能将心跳频率与主线程调用 KafkaConsumer.poll 方法的频率分开,从而解耦真实的消息处理逻辑与消费者组成员存活性管理。

不过,虽然有心跳线程,但实际的消息获取逻辑依然是在用户主线程中完成的。因此,在消费消息的这个层面上,我们依然可以安全地认为 KafkaConsumer 是单线程的设计。

其实,在社区推出 Java Consumer API 之前,Kafka 中存在着一组统称为 Scala Consumer 的 API。这组 API,或者说这个 Consumer,也被称为老版本 Consumer,目前在新版的 Kafka 代码中已经被完全移除了。

我之所以重提旧事,是想告诉你,老版本 Consumer 是多线程的架构,每个 Consumer 实例在内部为所有订阅的主题分区创建对应的消息获取线程,也称 Fetcher 线程。老版本 Consumer 同时也是阻塞式的(blocking),Consumer 实例启动后,内部会创建很多阻塞式的消息获取迭代器。但在很多场景下,Consumer 端是有非阻塞需求的,比如在流处理应用中执行过滤(filter)、连接(join)、分组(group by)等操作时就不能是阻塞式的。基于这个原因,社区为新版本 Consumer 设计了单线程 + 轮询的机制。这种设计能够较好地实现非阻塞式的消息获取。

除此之外,单线程的设计能够简化 Consumer 端的设计。Consumer 获取到消息后,处理消息的逻辑是否采用多线程,完全由你决定。这样,你就拥有了把消息处理的多线程管理策略从 Consumer 端代码中剥离的权利。

另外,不论使用哪种编程语言,单线程的设计都比较容易实现。相反,并不是所有的编程语言都能够很好地支持多线程。从这一点上来说,单线程设计的 Consumer 更容易移植到其他语言上。毕竟,Kafka 社区想要打造上下游生态的话,肯定是希望出现越来越多的客户端的。

多线程方案

了解了单线程的设计原理之后,我们来具体分析一下 KafkaConsumer 这个类的使用方法,以及如何推演出对应的多线程方案。

首先,我们要明确的是,KafkaConsumer 类不是线程安全的 (thread-safe)。所有的网络 I/O 处理都是发生在用户主线程中,因此,你在使用过程中必须要确保线程安全。简单来说,就是你不能在多个线程中共享同一个 KafkaConsumer 实例,否则程序会抛出 ConcurrentModificationException 异常。

当然了,这也不是绝对的。KafkaConsumer 中有个方法是例外的,它就是wakeup(),你可以在其他线程中安全地调用KafkaConsumer.wakeup()来唤醒 Consumer。

鉴于 KafkaConsumer 不是线程安全的事实,我们能够制定两套多线程方案。

  1. 消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程。如下图所示:

总体来说,这两种方案都会创建多个线程,这些线程都会参与到消息的消费过程中,但各自的思路是不一样的。

我们来打个比方。比如一个完整的消费者应用程序要做的事情是 1、2、3、4、5,那么方案 1 的思路是粗粒度化的工作划分,也就是说方案 1 会创建多个线程,每个线程完整地执行 1、2、3、4、5,以实现并行处理的目标,它不会进一步分割具体的子任务;而方案 2 则更细粒度化,它会将 1、2 分割出来,用单线程(也可以是多线程)来做,对于 3、4、5,则用另外的多个线程来做。

这两种方案孰优孰劣呢?应该说是各有千秋。我总结了一下这两种方案的优缺点,我们先来看看下面这张表格。


推荐阅读

结合案例深入理解DDD聚合与聚合根

技术架构:作为开发,你真的了解系统吗-CSDN博客

相关文章:

Kafka 多线程开发消费者实例

目前,计算机的硬件条件已经大大改善,即使是在普通的笔记本电脑上,多核都已经是标配了,更不用说专业的服务器了。如果跑在强劲服务器机器上的应用程序依然是单线程架构,那实在是有点暴殄天物了。不过,Kafka …...

Linux线程池实现

1.线程池实现 全部代码&#xff1a;whb-helloworld/113 1.唤醒线程 一个是唤醒全部线程&#xff0c;一个是唤醒一个线程。 void WakeUpAllThread(){LockGuard lockguard(_mutex);if (_sleepernum)_cond.Broadcast();LOG(LogLevel::INFO) << "唤醒所有的休眠线程&q…...

Linux《进程概念(上)》

在之前的Linux学习当中我们已经了解了基本的Linux指令以及基础的开发工具的使用&#xff0c;那么接下来我们就要开始Linux当中一个非常重要的部分的学习——进程&#xff0c;在此进程是我们之后Linux学习的基础&#xff0c;并且通过进程的学习会让我们了解更多的操作系统的相关…...

【算法】并查集基础讲解

一、定义 一种树型的数据结构&#xff0c;用于处理一些不相交集合的合并及查询问题。思想是用一个数组表示了整片森林&#xff08;parent&#xff09;&#xff0c;树的根节点唯一标识了一个集合&#xff0c;只要找到了某个元素的的树根&#xff0c;就能确定它在哪个集合里。 …...

C++ STL常用算法之常用集合算法

常用集合算法 学习目标: 掌握常用的集合算法 算法简介: set_intersection // 求两个容器的交集 set_union // 求两个容器的并集 set_difference // 求两个容器的差集 set_intersection 功能描述: 求两个容器的交集 函数原型: set_intersection(iterator beg1, iterat…...

Qt warning LNK4042: 对象被多次指定;已忽略多余的指定

一、常规原因&#xff1a; pro或pri 文件中源文件被多次包含 解决&#xff1a;删除变量 SOURCES 和 HEADERS 中重复条目 二、误用 对于某些pri库可以使用如下代码简写包含 INCLUDEPATH $$PWDHEADERS $$PWD/*.hSOURCES $$PWD/*.cpp但是假如该目录下只有头文件&#xff0c;没…...

ACM模式常用方法总结(Java篇)

文章目录 一、ACM输入输出模式二、重要语法2.1、导包2.2、读取数据2.3、判断是否有下一个数据2.4、输出2.5、关闭scanner2.6、易踩坑点 一、ACM输入输出模式 在力扣上编写代码时使用的是核心代码模式&#xff0c;如果在面试中遇到ACM模式就会比较迷茫&#xff1f;ACM模式要求你…...

日程公布| 第八届地球空间大数据与云计算前沿大会与集中学习(3号通知)

日程公布| 第八届地球空间大数据与云计算前沿大会与集中学习&#xff08;3号通知&#xff09; 日程公布| 第八届地球空间大数据与云计算前沿大会与集中学习&#xff08;3号通知&#xff09;...

leetcode 28 Find the Index of the First Occurrence in a String

直接用kmp算法 class Solution { public:int strStr(string haystack, string needle) {return kmp(haystack,needle);}int kmp(std::string &text,std::string &pattern){int n text.size();int m pattern.size();if(m 0)return 0;std::vector<int> next;ne…...

MATLAB中rmfield函数用法

目录 语法 说明 示例 删除单个字段 删除多个字段 rmfield函数的功能是删除结构体中的字段。 语法 s rmfield(s,field) 说明 s rmfield(s,field) 从结构体数组 s 中删除指定的一个或多个字段。使用字符向量元胞数组或字符串数组指定多个字段。s 的维度保持不变。 示例…...

Linux C语言调用第三方库,第三方库如何编译安装

在 Linux 环境下使用 C 语言调用第三方库时&#xff0c;通常需要先对第三方库进行编译和安装。以下为你详细介绍一般的编译安装步骤&#xff0c;并给出不同类型第三方库&#xff08;如使用 Makefile、CMake 构建系统&#xff09;的具体示例。 一般步骤 1. 获取第三方库源码 …...

leetcode -编辑距离

为了求解将 word1 转换成 word2 所需的最少操作数&#xff0c;可以使用动态规划。以下是详细的解决方案&#xff1a; ### 方法思路 1. **定义状态** dp[i][j] 表示将 word1 的前 i 个字符转换成 word2 的前 j 个字符所需的最少操作数。 2. **状态转移方程** - 如果 word1[…...

【Ollama】大模型运行框架

文章目录 安装与运行导入LLMHugginface模型-转换为-GGUF模型在指定gpu上运行model存储路径设置 ollama接口 官网 github中文介绍 安装与运行 安装教程 安装 wget https://ollama.com/download/ollama-linux-amd64.tgz tar -xzvf ollama-linux-amd64.tgz添加ollama的环境变量…...

字节开源版Manus来袭

字节开源版Manus来袭 项目地址&#xff1a;https://github.com/langmanus/langmanus/blob/main/README_zh.md 在人工智能领域&#xff0c;Manus的出现无疑是一颗重磅炸弹&#xff0c;它凭借强大的通用Agent能力&#xff0c;迅速吸引了全球开发者和AI爱好者的目光。然而&#…...

论文阅读笔记——PointVLA: Injecting the 3D World into Vision-Language-Action Models

PointVLA 论文 现有的 VLA 基于 2D 视觉-语言数据表现良好但缺乏 3D 几何先验导致空间推理缺陷。传统方案&#xff1a;1&#xff09;3D->2D 投影&#xff0c;造成几何信息损失&#xff1b;2&#xff09;3D 数据集少。PointVLA 保留原有 VLA&#xff0c;提取点云特征&#xf…...

selenium实现自动登录项目(5)

1、163邮箱自动登录功能 遇到的问题&#xff1a; 1、登录页面&#xff0c;在定位表单时候&#xff0c;采用id&#xff0c;xpath&#xff0c;css selector都无法定位成功&#xff0c;因为id后面有个随机生成的数字&#xff08;//*[id"x-URS-iframe1741925838640.6785&quo…...

在win11 环境下 新安装 WSL ubuntu + 换国内镜像源 + ssh + 桌面环境 + Pyhton 环境 + vim 设置插件安装

在win11 环境下 新安装 WSL ubuntu ssh gnome 桌面环境 Pyhton 环境 vim 设置插件安装 简单介绍详细流程换国内镜像源安装 ssh 桌面环境python 环境vim 设置插件安装 简单介绍 内容有点长&#xff0c;这里就先简单描述内容了。主要是快速在 Win11 搭建一个 wsl 的 linux 环…...

基于springboot课程学习与互动平台(源码+lw+部署文档+讲解),源码可白嫖!

摘要 随着我国经济的高速发展与人们生活水平的日益提高&#xff0c;人们对生活质量的追求也多种多样。尤其在人们生活节奏不断加快的当下&#xff0c;人们更趋向于足不出户解决生活上的问题&#xff0c;线上管理系统展现了其蓬勃生命力和广阔的前景。与此同时&#xff0c;在此…...

通俗易懂的大模型原理

十分钟揭秘DeepSeek原理&#xff0c;通俗易懂的大语言模型科普&#xff01;_哔哩哔哩_bilibili 最基础原理&#xff0c;x是输入&#xff0c;y是输出。上百万和上百亿的参数 将一句话转化为数字向量 一句话就是向量矩阵 输入矩阵和参数矩阵进行计算得出输出矩阵&#xff0c;因为…...

Vue 3 模板引用(Template Refs)详解与实战示例

Vue 3 模板引用&#xff08;Template Refs&#xff09;详解与实战示例 引言 在 Vue 开发中&#xff0c;通常推荐使用 响应式数据 (ref 和 reactive) 进行数据绑定&#xff0c;而不是直接操作 DOM。但是&#xff0c;在某些情况下&#xff0c;我们确实需要访问某个组件或 DOM 元…...

【Deep Reinforcement Learning Hands-On Third Edition】【序】

书名&#xff1a;深度强化学习实践 第三版 副标题&#xff1a;一个实用且容易跟得上的强化学习指南&#xff0c;从&#xff08;Q-learning和DQNs&#xff09;到&#xff08;PPO和RLHF&#xff09; 作者&#xff1a;Maxim Lapan 1.书中目录 模块一&#xff1a;强化学习初探 章…...

热门索尼S-Log3电影感氛围旅拍LUTS调色预设 Christian Mate Grab - Sony S-Log3 Cinematic LUTs

热门索尼S-Log3电影感氛围旅拍LUTS调色预设 Christian Mate Grab – Sony S-Log3 Cinematic LUTs 我们最好的 Film Look S-Log3 LUT 的集合&#xff0c;适用于索尼无反光镜相机。无论您是在户外、室内、风景还是旅行电影中拍摄&#xff0c;这些 LUT 都经过优化&#xff0c;可为…...

Hadoop/Spark 生态

Hadoop/Spark 生态是大数据处理的核心技术体系&#xff0c;专为解决海量数据的存储、计算和分析问题而设计。以下从底层原理到核心组件详细讲解&#xff0c;帮助你快速建立知识框架&#xff01; 一、为什么需要 Hadoop/Spark&#xff1f; ​传统单机瓶颈&#xff1a; 数据量超…...

.global

.global关键字用来让一个符号对链接器可见&#xff0c;可以供其他链接对象模块使用。 global是告诉编译器&#xff0c;其后是全局可见的名字【变量或函数名】。 .global start 让start符号成为可见的标示符&#xff0c;这样链接器就知道跳转到程序中的什么地方并开始执行。li…...

八股总结(Java)实时更新!

八股总结&#xff08;java&#xff09; ArrayList和LinkedList有什么区别 ArrayList底层是动态数组&#xff0c;LinkedList底层是双向链表&#xff1b;前者利于随机访问&#xff0c;后者利于头尾插入&#xff1b;前者内存连续分配&#xff0c;后者通过指针连接多块不连续的内存…...

@emotion/css + react+动态主题切换

1.下载插件 npm install --save emotion/css 2.创建ThemeContext.tsx // src/ThemeContext.tsx import React, { createContext, useContext, useState } from "react";// 定义主题类型 export type Theme "light" | "dark";// 定义主题上下…...

Python Cookbook-4.16 用字典分派方法和函数

任务 需要根据某个控制变量的值执行不同的代码片段——在其他的语言中你可能会使用case 语句。 解决方案 归功于面向对象编程的优雅的分派概念&#xff0c;case语句的使用大多(但不是所有)都可以被替换成其他分派形式。在Python中&#xff0c;字典及函数是一等(first-class)…...

亚马逊玩具品类技术驱动型选品策略:从趋势洞察到合规基建

一、全球玩具电商技术演进趋势 &#xff08;技术化重构原市场背景&#xff09; 数据可视化分析&#xff1a;通过亚马逊SP-API抓取2023年玩具品类GMV分布热力图 监管技术升级&#xff1a; 美国CPSC启用AI质检系统&#xff08;缺陷识别准确率92.7%&#xff09; 欧盟EPR合规接口…...

【jQuery】插件

目录 一、 jQuery插件 1. 瀑布流插件&#xff1a; jQuery 之家 http://www.htmleaf.com/ 2. 图片懒加载&#xff1a; jQuery 插件库 http://www.jq22.com/ 3. 全屏滚动 总结不易~ 本章节对我有很大收获&#xff0c;希望对你也是~~~ 一、 jQuery插件 jQuery 功能…...

MATLAB导入Excel数据

假如Excel中存在三列数据需要导入Matlab中。 保证该Excel文件与Matlab程序在同一目录下。 function [time, voltage, current] test(filename)% 读取Excel文件并提取时间、电压、电流数据% 输入参数:% filename: Excel文件名&#xff08;需包含路径&#xff0c;如C:\data\…...