当前位置: 首页 > news >正文

Kafka-消费者-KafkaConsumer分析

与KafkaProducer不同的是,KafkaConsumer不是一个线程安全的类。

为了便于分析,我们认为下面介绍的所有操作都是在同一线程中完成的,所以不需要考虑锁的问题。

这种设计将实现多线程处理消息的逻辑转移到了调用KafkaConsumer的代码中,可以根据业务逻辑使用不同的实现方式。

例如,可以使用“线程封闭”的方式,每个业务线程拥有一个KafkaConsumer对象,这种方式实现简单、快速。

还可以使用两个线程池实现“生产者—消费者”模式,解耦消息消费和消息处理的逻辑。

其中一个线程池中每个线程拥有一个KafkaConsumer对象,负责从Kafka集群拉取消息,然后将消息放入队列中缓存,而另一个线程池中的线程负责从队列中获取消息,执行处理消息的业务逻辑。

下面开始对KafkaConsumer的分析。

KafkaConsumer实现了Consumer接口,Consumer接口中定义了KafkaConsumer对外的API,其核心方法可以分为下面六类。

  • subscribe()方法:订阅指定的Topic,并为消费者自动分配分区。
  • assign()方法:用户手动订阅指定的Topic,并且指定消费的分区。此方法与subscribe()方法互斥。
  • commit*()方法:提交消费者已经消费完成的offset。
  • seek*()方法:指定消费者起始消费的位置。
  • poll()方法:负责从服务端获取消息。
  • pause()、resume()方法:暂停/继续Consumer,暂停后poll方法会返回空。

了解了Consumer接口定义的功能之后,我们下面就来分析KafkaConsumer的具体实现。首先,我们需要了解KafkaConsumer中重要的字段,如图所示。

在这里插入图片描述

  • PRODUCER_CLIENT_ID_SEQUENCE:clientld的生成器,如果没有明确指定client的Id,则使用字段生成一个ID。
  • clientld:Consumer的唯一标示。
  • coordinator:控制着Consumer与服务端GroupCoordinator之间的通信逻辑,可以将其理解成Consumer与服务端GroupCoordinator通信的门面。
  • keyDeserializer和valueDeserializer:key反序列化器和value反序列化器。
  • fetcher:负责从服务端获取消息。
  • interceptors:Consumerlnterceptor集合,ConsumerInterceptor.onConsumer()方法可以在消息通过poll()方法返回给用户之前对其进行拦截或修改;ConsumerInterceptor.onCommit()方法也可以在服务端返回提交offset成功的响应时对其进行拦截或修改。
  • client:负责消费者与Kafka服务端的网络通信。
  • subscriptions:维护了消费者的消费状态。
  • metadata:记录了整个Kafka集群的元信息。
  • currentThread和refcount:分别记录了当前使用KafkaConsumer的线程Id和重入次数,KafkaConsumer的acquire()方法和release()方法实现了一个“轻量级锁”,它并非真正的锁,仅是检测是否有多线程并发操作KafkaConsumer而已。

在后面的分析过程中,我们会逐个分析KafkaConsumer依赖的组件的功能和实现。

相关文章:

Kafka-消费者-KafkaConsumer分析

与KafkaProducer不同的是,KafkaConsumer不是一个线程安全的类。 为了便于分析,我们认为下面介绍的所有操作都是在同一线程中完成的,所以不需要考虑锁的问题。 这种设计将实现多线程处理消息的逻辑转移到了调用KafkaConsumer的代码中&#x…...

Spring | Spring中的Bean--下

Spring中的Bean: 4.Bean的生命周期5.Bean的配装配式 ( 添加Bean到IOC容器的方式 依赖注入的方式 )5.1 基于XML的配置5.2 基于Annotation (注解) 的装配 (更常用)5.3 自动装配 4.Bean的生命周期 Spring容器可以管理 singleton作用域的Bean的生命周期,在此…...

本周五上海见 第二届证券基金行业先进计算技术大会暨2024低时延技术创新实践论坛(上海站)即将召开

低时延技术是证券基金期货领域业务系统的核心技术,是打造极速交易系统领先优势的关键,也是证券基金行业关注的前沿技术热点。 1月19日下午,第二届证券基金行业先进计算技术大会暨2024低时延技术创新实践论坛(上海站)即…...

怎么安装IK分词器

.安装IK分词器 1.在线安装ik插件(较慢) # 进入容器内部 docker exec -it elasticsearch /bin/bash ​ # 在线下载并安装 ./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.12.1/elastics…...

【踩坑】flask_uploads报错cannot import name ‘secure_filename‘

转载请注明出处:小锋学长生活大爆炸[xfxuezhang.cn] 背景说明 截至目前,用新版的flask实现文件上传(用到flask_uploads库),会出现这个问题。 问题原因 版本问题,新的werkzeug已经把secure_filename的位置改了。 解决方法 手动修改…...

AI编程可视化Java项目拆解第一弹,解析本地Java项目

之前分享过一篇使用 AI 可视化 Java 项目的文章,同步在 AI 破局星球、知乎、掘金等地方都分享了。 原文在这里AI 编程:可视化 Java 项目 有很多人感兴趣,我打算写一个系列文章拆解这个项目,大家多多点赞支持~ 今天分享的是第一…...

使用arcgis pro是类似的控件样式 WPF

1.资源加载 <controls:ProWindow.Resources><ResourceDictionary><ResourceDictionary.MergedDictionaries><extensions:DesignOnlyResourceDictionary Source"pack://application:,,,/ArcGIS.Desktop.Framework;component\Themes\Default.xaml&quo…...

C语言所有字符串函数举例如何使用

strcpy: 将一个字符串复制到另一个字符串中 char source[] "Hello"; char destination[10]; strcpy(destination, source);strcat: 将一个字符串连接到另一个字符串的末尾 char str1[20] "Hello"; char str2[] "World"; strcat(str1, str2)…...

ArcGIS Pro 如何新建布局

你是否已经习惯了在ArcGIS中数据视图和布局视图之间来回切换&#xff0c;到了ArcGIS Pro中却找不到二者之间切换的按钮&#xff0c;即使新建布局后却发现地图怎么却是一片空白。 这一切的一切都是因为ArcGIS Pro的功能框架完全不同&#xff0c;这里为大家介绍一下在ArcGIS Pro…...

如何解决态势感知中的“时隐时现”问题

解决态势感知中的“时隐时现”问题有以下几个方法&#xff1a; 1、确保所有关键的监控设备和传感器正常运行&#xff0c;能够及时和准确地检测到各种异常情况。 2、引入先进的技术手段。例如使用人工智能和机器学习算法来识别和分析大量的数据&#xff0c;快速发现异常和威胁&a…...

为什么JavaScript中0.1 + 0.2 ≠ 0.3

JavaScript中的浮点数运算有时候会出现一点偏差。下面解释为什么0.1 0.2 ≠ 0.3,以及如果你需要精确运算应该怎么做。 如果1 2 3,那么为什么在JavaScript中0.1 0.2 ≠ 0.3?这个原因与计算机科学和浮点数运算有关。 我建议你打开浏览器的控制台,输入0.1 0.2来查看结果。…...

Unity关于纹理图片格式带来的内存问题和对预制体批量格式和大小减半处理

我们经常会遇到内存问题&#xff0c;这次就是遇到很多图片的默认格式被改成了RGB32&#xff0c;导致Android打包后运行内存明显增加。 发生了什么 打包Android后&#xff0c;发现经常崩溃&#xff0c;明显内存可能除了问题&#xff0c;看了内存后发现了问题。 见下图&#xf…...

2024美赛数学建模思路 - 案例:ID3-决策树分类算法

文章目录 0 赛题思路1 算法介绍2 FP树表示法3 构建FP树4 实现代码 建模资料 0 赛题思路 &#xff08;赛题出来以后第一时间在CSDN分享&#xff09; https://blog.csdn.net/dc_sinor?typeblog 1 算法介绍 FP-Tree算法全称是FrequentPattern Tree算法&#xff0c;就是频繁模…...

GitHub图床搭建

1 准备Github账号 如果没有Github账号需要先在官网注册一个账号 2 创建仓库 在github上创建一个仓库&#xff0c;随便一个普通的仓库就行&#xff0c;选择公共仓库 并且配置github仓库的pages&#xff0c;选择默认访问的分支及默认路径 3 github token获取 github token创…...

DQN、Double DQN、Dueling DQN、Per DQN、NoisyDQN 学习笔记

文章目录 DQN (Deep Q-Network)说明伪代码应用范围 Double DQN说明伪代码应用范围 Dueling DQN实现原理应用范围伪代码 Per DQN (Prioritized Experience Replay DQN)应用范围伪代码 NoisyDQN伪代码应用范围 部分内容与图片摘自&#xff1a;JoyRL 、 EasyRL DQN (Deep Q-Networ…...

C++ 编程需要什么样的开发环境?

C 编程需要什么样的开发环境&#xff1f; 在开始前我有一些资料&#xff0c;是我根据网友给的问题精心整理了一份「C的资料从专业入门到高级教程」&#xff0c; 点个关注在评论区回复“888”之后私信回复“888”&#xff0c;全部无偿共享给大家&#xff01;&#xff01;&#…...

Unity文字游戏开发日志(1)—— 打字机效果

作者是一名OIer,因为兴趣&#xff0c;想在寒假期间开发一款文字游戏的demo。 本博客仅用作记录&#xff0c;马蜂极度不符合规范。 但是&#xff0c;可以用来避坑。 1.等待功能——使用的是协程函数&#xff0c;且调用与常规调用函数不同。 private IEnumerator Sco(){isScoe…...

从0开始python学习-48.pytest框架之断言

目录 1. 响应进行断言 1.1 在yaml用例中写入断言内容 1.2 封装断言方法 1.3 在执行流程中加入断言判断内容 2. 数据库数据断言 2.1 在yaml用例中写入断言内容 2.2 连接数据库并封装执行sql的方法 2.3 封装后校验方法是否可执行 2.4 使用之前封装的断言方法&#xff0c…...

学习JavaEE的日子 day13补 深入类加载机制及底层

深入类加载机制 初识类加载过程 使用某个类时&#xff0c;如果该类的class文件没有加载到内存时&#xff0c;则系统会通过以下三个步骤来对该类进行初始化 1.类的加载&#xff08;Load&#xff09; → 2.类的连接&#xff08;Link&#xff09; → 3.类的初始化&#xff08;In…...

C# WebApi传参及Postman调试

概述 欢迎来到本文&#xff0c;本篇文章将会探讨C# WebApi中传递参数的方法。在WebApi中&#xff0c;参数传递是一个非常重要的概念&#xff0c;因为它使得我们能够从客户端获取数据&#xff0c;并将数据传递到服务器端进行处理。WebApi是一种使用HTTP协议进行通信的RESTful服…...

从电网到实验室——10kW大功率电源的Psim仿真实战

基于Psim的Boost型 PFC移相全桥AC-DC电源设计仿真 1、前级电网输入220AC&#xff0c;50Hz&#xff0c;中间级母线电压为600V&#xff0c;后级600V输入&#xff0c;547V输出&#xff0c;电压可调&#xff0c;功率10kW 2、前级基于Boost电路PFC&#xff0c;平均电流控制&#xff…...

百川2-13B-4bits模型微调指南:提升OpenClaw任务执行准确率

百川2-13B-4bits模型微调指南&#xff1a;提升OpenClaw任务执行准确率 1. 为什么需要微调百川模型&#xff1f; 去年夏天&#xff0c;当我第一次用OpenClaw自动化整理电脑上的数千份文档时&#xff0c;遇到了一个尴尬的问题——AI经常把技术文档和私人照片混在一起归类。这让…...

Minica 源码解读:深入理解证书生成的核心算法

Minica 源码解读&#xff1a;深入理解证书生成的核心算法 【免费下载链接】minica minica is a small, simple CA intended for use in situations where the CA operator also operates each host where a certificate will be used. 项目地址: https://gitcode.com/gh_mirr…...

深度解析Mi-Create:开源智能手表表盘编辑器的完整实践指南

深度解析Mi-Create&#xff1a;开源智能手表表盘编辑器的完整实践指南 【免费下载链接】Mi-Create Unofficial watchface creator for Xiaomi wearables ~2021 and above 项目地址: https://gitcode.com/gh_mirrors/mi/Mi-Create 项目愿景与定位 在智能穿戴设备快速发展…...

MxRadioRF2xx库:ARM Mbed平台RF2xx射频驱动开发指南

1. MxRadioRF2xx 库概述 MxRadioRF2xx 是一个专为 ARM Mbed OS 平台设计的 Atmel&#xff08;现 Microchip&#xff09;RF2xx 系列射频收发器驱动库。该库并非对底层寄存器操作的简单封装&#xff0c;而是面向嵌入式无线应用开发者的工程化抽象层&#xff0c;其核心目标是&…...

从CentOS 7迁移到Ubuntu 22.04 LTS,我整理了一份保姆级系统初始化脚本(含内核调优、换源、时区设置)

从CentOS 7迁移到Ubuntu 22.04 LTS&#xff1a;系统初始化与性能调优全指南 当CentOS 7走向生命周期的终点&#xff0c;许多运维团队正面临向新平台的战略转移。Ubuntu 22.04 LTS以其长期支持特性和活跃的社区生态&#xff0c;成为最受欢迎的替代选择之一。但迁移绝非简单的系统…...

HRNet的‘并行多分支’到底强在哪?一个动画图解带你彻底搞懂特征融合机制

HRNet并行多分支架构的视觉化解析&#xff1a;如何通过双向特征融合突破关键点检测精度瓶颈 在计算机视觉领域&#xff0c;关键点检测任务&#xff08;如人体姿态估计、人脸特征点定位&#xff09;对空间精度的要求近乎苛刻。传统卷积神经网络通过层层下采样提取语义特征的代价…...

你用AI写代码时,是不是总觉得“它懂语法,却搞不定真实工程”?Composer 2的答案在这里

很多开发者都有过这种体验&#xff1a;把一个真实项目需求甩给AI&#xff0c;它能秒出语法完美的代码片段&#xff0c;可一到大型代码库、遗留系统、多文件联动的时候&#xff0c;就开始原地打转。改了半天核心逻辑没动&#xff0c;引入新问题&#xff0c;或者干脆在长链条任务…...

用腾讯云轻量锐驰和对象存储,手把手教你30分钟搞定私人不限速网盘(附SSL证书配置)

零基础30分钟搭建高性能私人网盘&#xff1a;腾讯云轻量锐驰对象存储实战指南 你是否也受够了公有网盘动辄几百KB的下载速度&#xff1f;每次分享文件给朋友&#xff0c;对方总要忍受龟速下载的煎熬。更别提那些突然消失的文件和频繁弹出的会员广告——是时候拥有一个完全自主掌…...

PingFangSC 字体技术深度解析:现代Web字体架构实践指南

PingFangSC 字体技术深度解析&#xff1a;现代Web字体架构实践指南 【免费下载链接】PingFangSC PingFangSC字体包文件、苹果平方字体文件&#xff0c;包含ttf和woff2格式 项目地址: https://gitcode.com/gh_mirrors/pi/PingFangSC PingFangSC&#xff08;苹方-简&#…...