Kafka-消费者-KafkaConsumer分析
与KafkaProducer不同的是,KafkaConsumer不是一个线程安全的类。
为了便于分析,我们认为下面介绍的所有操作都是在同一线程中完成的,所以不需要考虑锁的问题。
这种设计将实现多线程处理消息的逻辑转移到了调用KafkaConsumer的代码中,可以根据业务逻辑使用不同的实现方式。
例如,可以使用“线程封闭”的方式,每个业务线程拥有一个KafkaConsumer对象,这种方式实现简单、快速。
还可以使用两个线程池实现“生产者—消费者”模式,解耦消息消费和消息处理的逻辑。
其中一个线程池中每个线程拥有一个KafkaConsumer对象,负责从Kafka集群拉取消息,然后将消息放入队列中缓存,而另一个线程池中的线程负责从队列中获取消息,执行处理消息的业务逻辑。
下面开始对KafkaConsumer的分析。
KafkaConsumer实现了Consumer接口,Consumer接口中定义了KafkaConsumer对外的API,其核心方法可以分为下面六类。
- subscribe()方法:订阅指定的Topic,并为消费者自动分配分区。
- assign()方法:用户手动订阅指定的Topic,并且指定消费的分区。此方法与subscribe()方法互斥。
- commit*()方法:提交消费者已经消费完成的offset。
- seek*()方法:指定消费者起始消费的位置。
- poll()方法:负责从服务端获取消息。
- pause()、resume()方法:暂停/继续Consumer,暂停后poll方法会返回空。
了解了Consumer接口定义的功能之后,我们下面就来分析KafkaConsumer的具体实现。首先,我们需要了解KafkaConsumer中重要的字段,如图所示。

- PRODUCER_CLIENT_ID_SEQUENCE:clientld的生成器,如果没有明确指定client的Id,则使用字段生成一个ID。
- clientld:Consumer的唯一标示。
- coordinator:控制着Consumer与服务端GroupCoordinator之间的通信逻辑,可以将其理解成Consumer与服务端GroupCoordinator通信的门面。
- keyDeserializer和valueDeserializer:key反序列化器和value反序列化器。
- fetcher:负责从服务端获取消息。
- interceptors:Consumerlnterceptor集合,ConsumerInterceptor.onConsumer()方法可以在消息通过poll()方法返回给用户之前对其进行拦截或修改;ConsumerInterceptor.onCommit()方法也可以在服务端返回提交offset成功的响应时对其进行拦截或修改。
- client:负责消费者与Kafka服务端的网络通信。
- subscriptions:维护了消费者的消费状态。
- metadata:记录了整个Kafka集群的元信息。
- currentThread和refcount:分别记录了当前使用KafkaConsumer的线程Id和重入次数,KafkaConsumer的acquire()方法和release()方法实现了一个“轻量级锁”,它并非真正的锁,仅是检测是否有多线程并发操作KafkaConsumer而已。
在后面的分析过程中,我们会逐个分析KafkaConsumer依赖的组件的功能和实现。
相关文章:
Kafka-消费者-KafkaConsumer分析
与KafkaProducer不同的是,KafkaConsumer不是一个线程安全的类。 为了便于分析,我们认为下面介绍的所有操作都是在同一线程中完成的,所以不需要考虑锁的问题。 这种设计将实现多线程处理消息的逻辑转移到了调用KafkaConsumer的代码中&#x…...
Spring | Spring中的Bean--下
Spring中的Bean: 4.Bean的生命周期5.Bean的配装配式 ( 添加Bean到IOC容器的方式 依赖注入的方式 )5.1 基于XML的配置5.2 基于Annotation (注解) 的装配 (更常用)5.3 自动装配 4.Bean的生命周期 Spring容器可以管理 singleton作用域的Bean的生命周期,在此…...
本周五上海见 第二届证券基金行业先进计算技术大会暨2024低时延技术创新实践论坛(上海站)即将召开
低时延技术是证券基金期货领域业务系统的核心技术,是打造极速交易系统领先优势的关键,也是证券基金行业关注的前沿技术热点。 1月19日下午,第二届证券基金行业先进计算技术大会暨2024低时延技术创新实践论坛(上海站)即…...
怎么安装IK分词器
.安装IK分词器 1.在线安装ik插件(较慢) # 进入容器内部 docker exec -it elasticsearch /bin/bash # 在线下载并安装 ./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.12.1/elastics…...
【踩坑】flask_uploads报错cannot import name ‘secure_filename‘
转载请注明出处:小锋学长生活大爆炸[xfxuezhang.cn] 背景说明 截至目前,用新版的flask实现文件上传(用到flask_uploads库),会出现这个问题。 问题原因 版本问题,新的werkzeug已经把secure_filename的位置改了。 解决方法 手动修改…...
AI编程可视化Java项目拆解第一弹,解析本地Java项目
之前分享过一篇使用 AI 可视化 Java 项目的文章,同步在 AI 破局星球、知乎、掘金等地方都分享了。 原文在这里AI 编程:可视化 Java 项目 有很多人感兴趣,我打算写一个系列文章拆解这个项目,大家多多点赞支持~ 今天分享的是第一…...
使用arcgis pro是类似的控件样式 WPF
1.资源加载 <controls:ProWindow.Resources><ResourceDictionary><ResourceDictionary.MergedDictionaries><extensions:DesignOnlyResourceDictionary Source"pack://application:,,,/ArcGIS.Desktop.Framework;component\Themes\Default.xaml&quo…...
C语言所有字符串函数举例如何使用
strcpy: 将一个字符串复制到另一个字符串中 char source[] "Hello"; char destination[10]; strcpy(destination, source);strcat: 将一个字符串连接到另一个字符串的末尾 char str1[20] "Hello"; char str2[] "World"; strcat(str1, str2)…...
ArcGIS Pro 如何新建布局
你是否已经习惯了在ArcGIS中数据视图和布局视图之间来回切换,到了ArcGIS Pro中却找不到二者之间切换的按钮,即使新建布局后却发现地图怎么却是一片空白。 这一切的一切都是因为ArcGIS Pro的功能框架完全不同,这里为大家介绍一下在ArcGIS Pro…...
如何解决态势感知中的“时隐时现”问题
解决态势感知中的“时隐时现”问题有以下几个方法: 1、确保所有关键的监控设备和传感器正常运行,能够及时和准确地检测到各种异常情况。 2、引入先进的技术手段。例如使用人工智能和机器学习算法来识别和分析大量的数据,快速发现异常和威胁&a…...
为什么JavaScript中0.1 + 0.2 ≠ 0.3
JavaScript中的浮点数运算有时候会出现一点偏差。下面解释为什么0.1 0.2 ≠ 0.3,以及如果你需要精确运算应该怎么做。 如果1 2 3,那么为什么在JavaScript中0.1 0.2 ≠ 0.3?这个原因与计算机科学和浮点数运算有关。 我建议你打开浏览器的控制台,输入0.1 0.2来查看结果。…...
Unity关于纹理图片格式带来的内存问题和对预制体批量格式和大小减半处理
我们经常会遇到内存问题,这次就是遇到很多图片的默认格式被改成了RGB32,导致Android打包后运行内存明显增加。 发生了什么 打包Android后,发现经常崩溃,明显内存可能除了问题,看了内存后发现了问题。 见下图…...
2024美赛数学建模思路 - 案例:ID3-决策树分类算法
文章目录 0 赛题思路1 算法介绍2 FP树表示法3 构建FP树4 实现代码 建模资料 0 赛题思路 (赛题出来以后第一时间在CSDN分享) https://blog.csdn.net/dc_sinor?typeblog 1 算法介绍 FP-Tree算法全称是FrequentPattern Tree算法,就是频繁模…...
GitHub图床搭建
1 准备Github账号 如果没有Github账号需要先在官网注册一个账号 2 创建仓库 在github上创建一个仓库,随便一个普通的仓库就行,选择公共仓库 并且配置github仓库的pages,选择默认访问的分支及默认路径 3 github token获取 github token创…...
DQN、Double DQN、Dueling DQN、Per DQN、NoisyDQN 学习笔记
文章目录 DQN (Deep Q-Network)说明伪代码应用范围 Double DQN说明伪代码应用范围 Dueling DQN实现原理应用范围伪代码 Per DQN (Prioritized Experience Replay DQN)应用范围伪代码 NoisyDQN伪代码应用范围 部分内容与图片摘自:JoyRL 、 EasyRL DQN (Deep Q-Networ…...
C++ 编程需要什么样的开发环境?
C 编程需要什么样的开发环境? 在开始前我有一些资料,是我根据网友给的问题精心整理了一份「C的资料从专业入门到高级教程」, 点个关注在评论区回复“888”之后私信回复“888”,全部无偿共享给大家!!&#…...
Unity文字游戏开发日志(1)—— 打字机效果
作者是一名OIer,因为兴趣,想在寒假期间开发一款文字游戏的demo。 本博客仅用作记录,马蜂极度不符合规范。 但是,可以用来避坑。 1.等待功能——使用的是协程函数,且调用与常规调用函数不同。 private IEnumerator Sco(){isScoe…...
从0开始python学习-48.pytest框架之断言
目录 1. 响应进行断言 1.1 在yaml用例中写入断言内容 1.2 封装断言方法 1.3 在执行流程中加入断言判断内容 2. 数据库数据断言 2.1 在yaml用例中写入断言内容 2.2 连接数据库并封装执行sql的方法 2.3 封装后校验方法是否可执行 2.4 使用之前封装的断言方法,…...
学习JavaEE的日子 day13补 深入类加载机制及底层
深入类加载机制 初识类加载过程 使用某个类时,如果该类的class文件没有加载到内存时,则系统会通过以下三个步骤来对该类进行初始化 1.类的加载(Load) → 2.类的连接(Link) → 3.类的初始化(In…...
C# WebApi传参及Postman调试
概述 欢迎来到本文,本篇文章将会探讨C# WebApi中传递参数的方法。在WebApi中,参数传递是一个非常重要的概念,因为它使得我们能够从客户端获取数据,并将数据传递到服务器端进行处理。WebApi是一种使用HTTP协议进行通信的RESTful服…...
挑战杯推荐项目
“人工智能”创意赛 - 智能艺术创作助手:借助大模型技术,开发能根据用户输入的主题、风格等要求,生成绘画、音乐、文学作品等多种形式艺术创作灵感或初稿的应用,帮助艺术家和创意爱好者激发创意、提高创作效率。 - 个性化梦境…...
golang循环变量捕获问题
在 Go 语言中,当在循环中启动协程(goroutine)时,如果在协程闭包中直接引用循环变量,可能会遇到一个常见的陷阱 - 循环变量捕获问题。让我详细解释一下: 问题背景 看这个代码片段: fo…...
iPhone密码忘记了办?iPhoneUnlocker,iPhone解锁工具Aiseesoft iPhone Unlocker 高级注册版分享
平时用 iPhone 的时候,难免会碰到解锁的麻烦事。比如密码忘了、人脸识别 / 指纹识别突然不灵,或者买了二手 iPhone 却被原来的 iCloud 账号锁住,这时候就需要靠谱的解锁工具来帮忙了。Aiseesoft iPhone Unlocker 就是专门解决这些问题的软件&…...
DAY 47
三、通道注意力 3.1 通道注意力的定义 # 新增:通道注意力模块(SE模块) class ChannelAttention(nn.Module):"""通道注意力模块(Squeeze-and-Excitation)"""def __init__(self, in_channels, reduction_rat…...
Linux-07 ubuntu 的 chrome 启动不了
文章目录 问题原因解决步骤一、卸载旧版chrome二、重新安装chorme三、启动不了,报错如下四、启动不了,解决如下 总结 问题原因 在应用中可以看到chrome,但是打不开(说明:原来的ubuntu系统出问题了,这个是备用的硬盘&a…...
自然语言处理——循环神经网络
自然语言处理——循环神经网络 循环神经网络应用到基于机器学习的自然语言处理任务序列到类别同步的序列到序列模式异步的序列到序列模式 参数学习和长程依赖问题基于门控的循环神经网络门控循环单元(GRU)长短期记忆神经网络(LSTM)…...
SpringTask-03.入门案例
一.入门案例 启动类: package com.sky;import lombok.extern.slf4j.Slf4j; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cache.annotation.EnableCach…...
《C++ 模板》
目录 函数模板 类模板 非类型模板参数 模板特化 函数模板特化 类模板的特化 模板,就像一个模具,里面可以将不同类型的材料做成一个形状,其分为函数模板和类模板。 函数模板 函数模板可以简化函数重载的代码。格式:templa…...
【JVM面试篇】高频八股汇总——类加载和类加载器
目录 1. 讲一下类加载过程? 2. Java创建对象的过程? 3. 对象的生命周期? 4. 类加载器有哪些? 5. 双亲委派模型的作用(好处)? 6. 讲一下类的加载和双亲委派原则? 7. 双亲委派模…...
Python 实现 Web 静态服务器(HTTP 协议)
目录 一、在本地启动 HTTP 服务器1. Windows 下安装 node.js1)下载安装包2)配置环境变量3)安装镜像4)node.js 的常用命令 2. 安装 http-server 服务3. 使用 http-server 开启服务1)使用 http-server2)详解 …...
