当前位置: 首页 > news >正文

Kafka-消费者-KafkaConsumer分析

与KafkaProducer不同的是,KafkaConsumer不是一个线程安全的类。

为了便于分析,我们认为下面介绍的所有操作都是在同一线程中完成的,所以不需要考虑锁的问题。

这种设计将实现多线程处理消息的逻辑转移到了调用KafkaConsumer的代码中,可以根据业务逻辑使用不同的实现方式。

例如,可以使用“线程封闭”的方式,每个业务线程拥有一个KafkaConsumer对象,这种方式实现简单、快速。

还可以使用两个线程池实现“生产者—消费者”模式,解耦消息消费和消息处理的逻辑。

其中一个线程池中每个线程拥有一个KafkaConsumer对象,负责从Kafka集群拉取消息,然后将消息放入队列中缓存,而另一个线程池中的线程负责从队列中获取消息,执行处理消息的业务逻辑。

下面开始对KafkaConsumer的分析。

KafkaConsumer实现了Consumer接口,Consumer接口中定义了KafkaConsumer对外的API,其核心方法可以分为下面六类。

  • subscribe()方法:订阅指定的Topic,并为消费者自动分配分区。
  • assign()方法:用户手动订阅指定的Topic,并且指定消费的分区。此方法与subscribe()方法互斥。
  • commit*()方法:提交消费者已经消费完成的offset。
  • seek*()方法:指定消费者起始消费的位置。
  • poll()方法:负责从服务端获取消息。
  • pause()、resume()方法:暂停/继续Consumer,暂停后poll方法会返回空。

了解了Consumer接口定义的功能之后,我们下面就来分析KafkaConsumer的具体实现。首先,我们需要了解KafkaConsumer中重要的字段,如图所示。

在这里插入图片描述

  • PRODUCER_CLIENT_ID_SEQUENCE:clientld的生成器,如果没有明确指定client的Id,则使用字段生成一个ID。
  • clientld:Consumer的唯一标示。
  • coordinator:控制着Consumer与服务端GroupCoordinator之间的通信逻辑,可以将其理解成Consumer与服务端GroupCoordinator通信的门面。
  • keyDeserializer和valueDeserializer:key反序列化器和value反序列化器。
  • fetcher:负责从服务端获取消息。
  • interceptors:Consumerlnterceptor集合,ConsumerInterceptor.onConsumer()方法可以在消息通过poll()方法返回给用户之前对其进行拦截或修改;ConsumerInterceptor.onCommit()方法也可以在服务端返回提交offset成功的响应时对其进行拦截或修改。
  • client:负责消费者与Kafka服务端的网络通信。
  • subscriptions:维护了消费者的消费状态。
  • metadata:记录了整个Kafka集群的元信息。
  • currentThread和refcount:分别记录了当前使用KafkaConsumer的线程Id和重入次数,KafkaConsumer的acquire()方法和release()方法实现了一个“轻量级锁”,它并非真正的锁,仅是检测是否有多线程并发操作KafkaConsumer而已。

在后面的分析过程中,我们会逐个分析KafkaConsumer依赖的组件的功能和实现。

相关文章:

Kafka-消费者-KafkaConsumer分析

与KafkaProducer不同的是,KafkaConsumer不是一个线程安全的类。 为了便于分析,我们认为下面介绍的所有操作都是在同一线程中完成的,所以不需要考虑锁的问题。 这种设计将实现多线程处理消息的逻辑转移到了调用KafkaConsumer的代码中&#x…...

Spring | Spring中的Bean--下

Spring中的Bean: 4.Bean的生命周期5.Bean的配装配式 ( 添加Bean到IOC容器的方式 依赖注入的方式 )5.1 基于XML的配置5.2 基于Annotation (注解) 的装配 (更常用)5.3 自动装配 4.Bean的生命周期 Spring容器可以管理 singleton作用域的Bean的生命周期,在此…...

本周五上海见 第二届证券基金行业先进计算技术大会暨2024低时延技术创新实践论坛(上海站)即将召开

低时延技术是证券基金期货领域业务系统的核心技术,是打造极速交易系统领先优势的关键,也是证券基金行业关注的前沿技术热点。 1月19日下午,第二届证券基金行业先进计算技术大会暨2024低时延技术创新实践论坛(上海站)即…...

怎么安装IK分词器

.安装IK分词器 1.在线安装ik插件(较慢) # 进入容器内部 docker exec -it elasticsearch /bin/bash ​ # 在线下载并安装 ./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.12.1/elastics…...

【踩坑】flask_uploads报错cannot import name ‘secure_filename‘

转载请注明出处:小锋学长生活大爆炸[xfxuezhang.cn] 背景说明 截至目前,用新版的flask实现文件上传(用到flask_uploads库),会出现这个问题。 问题原因 版本问题,新的werkzeug已经把secure_filename的位置改了。 解决方法 手动修改…...

AI编程可视化Java项目拆解第一弹,解析本地Java项目

之前分享过一篇使用 AI 可视化 Java 项目的文章,同步在 AI 破局星球、知乎、掘金等地方都分享了。 原文在这里AI 编程:可视化 Java 项目 有很多人感兴趣,我打算写一个系列文章拆解这个项目,大家多多点赞支持~ 今天分享的是第一…...

使用arcgis pro是类似的控件样式 WPF

1.资源加载 <controls:ProWindow.Resources><ResourceDictionary><ResourceDictionary.MergedDictionaries><extensions:DesignOnlyResourceDictionary Source"pack://application:,,,/ArcGIS.Desktop.Framework;component\Themes\Default.xaml&quo…...

C语言所有字符串函数举例如何使用

strcpy: 将一个字符串复制到另一个字符串中 char source[] "Hello"; char destination[10]; strcpy(destination, source);strcat: 将一个字符串连接到另一个字符串的末尾 char str1[20] "Hello"; char str2[] "World"; strcat(str1, str2)…...

ArcGIS Pro 如何新建布局

你是否已经习惯了在ArcGIS中数据视图和布局视图之间来回切换&#xff0c;到了ArcGIS Pro中却找不到二者之间切换的按钮&#xff0c;即使新建布局后却发现地图怎么却是一片空白。 这一切的一切都是因为ArcGIS Pro的功能框架完全不同&#xff0c;这里为大家介绍一下在ArcGIS Pro…...

如何解决态势感知中的“时隐时现”问题

解决态势感知中的“时隐时现”问题有以下几个方法&#xff1a; 1、确保所有关键的监控设备和传感器正常运行&#xff0c;能够及时和准确地检测到各种异常情况。 2、引入先进的技术手段。例如使用人工智能和机器学习算法来识别和分析大量的数据&#xff0c;快速发现异常和威胁&a…...

为什么JavaScript中0.1 + 0.2 ≠ 0.3

JavaScript中的浮点数运算有时候会出现一点偏差。下面解释为什么0.1 0.2 ≠ 0.3,以及如果你需要精确运算应该怎么做。 如果1 2 3,那么为什么在JavaScript中0.1 0.2 ≠ 0.3?这个原因与计算机科学和浮点数运算有关。 我建议你打开浏览器的控制台,输入0.1 0.2来查看结果。…...

Unity关于纹理图片格式带来的内存问题和对预制体批量格式和大小减半处理

我们经常会遇到内存问题&#xff0c;这次就是遇到很多图片的默认格式被改成了RGB32&#xff0c;导致Android打包后运行内存明显增加。 发生了什么 打包Android后&#xff0c;发现经常崩溃&#xff0c;明显内存可能除了问题&#xff0c;看了内存后发现了问题。 见下图&#xf…...

2024美赛数学建模思路 - 案例:ID3-决策树分类算法

文章目录 0 赛题思路1 算法介绍2 FP树表示法3 构建FP树4 实现代码 建模资料 0 赛题思路 &#xff08;赛题出来以后第一时间在CSDN分享&#xff09; https://blog.csdn.net/dc_sinor?typeblog 1 算法介绍 FP-Tree算法全称是FrequentPattern Tree算法&#xff0c;就是频繁模…...

GitHub图床搭建

1 准备Github账号 如果没有Github账号需要先在官网注册一个账号 2 创建仓库 在github上创建一个仓库&#xff0c;随便一个普通的仓库就行&#xff0c;选择公共仓库 并且配置github仓库的pages&#xff0c;选择默认访问的分支及默认路径 3 github token获取 github token创…...

DQN、Double DQN、Dueling DQN、Per DQN、NoisyDQN 学习笔记

文章目录 DQN (Deep Q-Network)说明伪代码应用范围 Double DQN说明伪代码应用范围 Dueling DQN实现原理应用范围伪代码 Per DQN (Prioritized Experience Replay DQN)应用范围伪代码 NoisyDQN伪代码应用范围 部分内容与图片摘自&#xff1a;JoyRL 、 EasyRL DQN (Deep Q-Networ…...

C++ 编程需要什么样的开发环境?

C 编程需要什么样的开发环境&#xff1f; 在开始前我有一些资料&#xff0c;是我根据网友给的问题精心整理了一份「C的资料从专业入门到高级教程」&#xff0c; 点个关注在评论区回复“888”之后私信回复“888”&#xff0c;全部无偿共享给大家&#xff01;&#xff01;&#…...

Unity文字游戏开发日志(1)—— 打字机效果

作者是一名OIer,因为兴趣&#xff0c;想在寒假期间开发一款文字游戏的demo。 本博客仅用作记录&#xff0c;马蜂极度不符合规范。 但是&#xff0c;可以用来避坑。 1.等待功能——使用的是协程函数&#xff0c;且调用与常规调用函数不同。 private IEnumerator Sco(){isScoe…...

从0开始python学习-48.pytest框架之断言

目录 1. 响应进行断言 1.1 在yaml用例中写入断言内容 1.2 封装断言方法 1.3 在执行流程中加入断言判断内容 2. 数据库数据断言 2.1 在yaml用例中写入断言内容 2.2 连接数据库并封装执行sql的方法 2.3 封装后校验方法是否可执行 2.4 使用之前封装的断言方法&#xff0c…...

学习JavaEE的日子 day13补 深入类加载机制及底层

深入类加载机制 初识类加载过程 使用某个类时&#xff0c;如果该类的class文件没有加载到内存时&#xff0c;则系统会通过以下三个步骤来对该类进行初始化 1.类的加载&#xff08;Load&#xff09; → 2.类的连接&#xff08;Link&#xff09; → 3.类的初始化&#xff08;In…...

C# WebApi传参及Postman调试

概述 欢迎来到本文&#xff0c;本篇文章将会探讨C# WebApi中传递参数的方法。在WebApi中&#xff0c;参数传递是一个非常重要的概念&#xff0c;因为它使得我们能够从客户端获取数据&#xff0c;并将数据传递到服务器端进行处理。WebApi是一种使用HTTP协议进行通信的RESTful服…...

AI-调查研究-01-正念冥想有用吗?对健康的影响及科学指南

点一下关注吧&#xff01;&#xff01;&#xff01;非常感谢&#xff01;&#xff01;持续更新&#xff01;&#xff01;&#xff01; &#x1f680; AI篇持续更新中&#xff01;&#xff08;长期更新&#xff09; 目前2025年06月05日更新到&#xff1a; AI炼丹日志-28 - Aud…...

手游刚开服就被攻击怎么办?如何防御DDoS?

开服初期是手游最脆弱的阶段&#xff0c;极易成为DDoS攻击的目标。一旦遭遇攻击&#xff0c;可能导致服务器瘫痪、玩家流失&#xff0c;甚至造成巨大经济损失。本文为开发者提供一套简洁有效的应急与防御方案&#xff0c;帮助快速应对并构建长期防护体系。 一、遭遇攻击的紧急应…...

Qt/C++开发监控GB28181系统/取流协议/同时支持udp/tcp被动/tcp主动

一、前言说明 在2011版本的gb28181协议中&#xff0c;拉取视频流只要求udp方式&#xff0c;从2016开始要求新增支持tcp被动和tcp主动两种方式&#xff0c;udp理论上会丢包的&#xff0c;所以实际使用过程可能会出现画面花屏的情况&#xff0c;而tcp肯定不丢包&#xff0c;起码…...

day52 ResNet18 CBAM

在深度学习的旅程中&#xff0c;我们不断探索如何提升模型的性能。今天&#xff0c;我将分享我在 ResNet18 模型中插入 CBAM&#xff08;Convolutional Block Attention Module&#xff09;模块&#xff0c;并采用分阶段微调策略的实践过程。通过这个过程&#xff0c;我不仅提升…...

屋顶变身“发电站” ,中天合创屋面分布式光伏发电项目顺利并网!

5月28日&#xff0c;中天合创屋面分布式光伏发电项目顺利并网发电&#xff0c;该项目位于内蒙古自治区鄂尔多斯市乌审旗&#xff0c;项目利用中天合创聚乙烯、聚丙烯仓库屋面作为场地建设光伏电站&#xff0c;总装机容量为9.96MWp。 项目投运后&#xff0c;每年可节约标煤3670…...

跨链模式:多链互操作架构与性能扩展方案

跨链模式&#xff1a;多链互操作架构与性能扩展方案 ——构建下一代区块链互联网的技术基石 一、跨链架构的核心范式演进 1. 分层协议栈&#xff1a;模块化解耦设计 现代跨链系统采用分层协议栈实现灵活扩展&#xff08;H2Cross架构&#xff09;&#xff1a; 适配层&#xf…...

TRS收益互换:跨境资本流动的金融创新工具与系统化解决方案

一、TRS收益互换的本质与业务逻辑 &#xff08;一&#xff09;概念解析 TRS&#xff08;Total Return Swap&#xff09;收益互换是一种金融衍生工具&#xff0c;指交易双方约定在未来一定期限内&#xff0c;基于特定资产或指数的表现进行现金流交换的协议。其核心特征包括&am…...

数据库分批入库

今天在工作中&#xff0c;遇到一个问题&#xff0c;就是分批查询的时候&#xff0c;由于批次过大导致出现了一些问题&#xff0c;一下是问题描述和解决方案&#xff1a; 示例&#xff1a; // 假设已有数据列表 dataList 和 PreparedStatement pstmt int batchSize 1000; // …...

OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别

OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别 直接训练提示词嵌入向量的核心区别 您提到的代码: prompt_embedding = initial_embedding.clone().requires_grad_(True) optimizer = torch.optim.Adam([prompt_embedding...

C++ Visual Studio 2017厂商给的源码没有.sln文件 易兆微芯片下载工具加开机动画下载。

1.先用Visual Studio 2017打开Yichip YC31xx loader.vcxproj&#xff0c;再用Visual Studio 2022打开。再保侟就有.sln文件了。 易兆微芯片下载工具加开机动画下载 ExtraDownloadFile1Info.\logo.bin|0|0|10D2000|0 MFC应用兼容CMD 在BOOL CYichipYC31xxloaderDlg::OnIni…...