当前位置: 首页 > news >正文

Kafka-消费者-KafkaConsumer分析

与KafkaProducer不同的是,KafkaConsumer不是一个线程安全的类。

为了便于分析,我们认为下面介绍的所有操作都是在同一线程中完成的,所以不需要考虑锁的问题。

这种设计将实现多线程处理消息的逻辑转移到了调用KafkaConsumer的代码中,可以根据业务逻辑使用不同的实现方式。

例如,可以使用“线程封闭”的方式,每个业务线程拥有一个KafkaConsumer对象,这种方式实现简单、快速。

还可以使用两个线程池实现“生产者—消费者”模式,解耦消息消费和消息处理的逻辑。

其中一个线程池中每个线程拥有一个KafkaConsumer对象,负责从Kafka集群拉取消息,然后将消息放入队列中缓存,而另一个线程池中的线程负责从队列中获取消息,执行处理消息的业务逻辑。

下面开始对KafkaConsumer的分析。

KafkaConsumer实现了Consumer接口,Consumer接口中定义了KafkaConsumer对外的API,其核心方法可以分为下面六类。

  • subscribe()方法:订阅指定的Topic,并为消费者自动分配分区。
  • assign()方法:用户手动订阅指定的Topic,并且指定消费的分区。此方法与subscribe()方法互斥。
  • commit*()方法:提交消费者已经消费完成的offset。
  • seek*()方法:指定消费者起始消费的位置。
  • poll()方法:负责从服务端获取消息。
  • pause()、resume()方法:暂停/继续Consumer,暂停后poll方法会返回空。

了解了Consumer接口定义的功能之后,我们下面就来分析KafkaConsumer的具体实现。首先,我们需要了解KafkaConsumer中重要的字段,如图所示。

在这里插入图片描述

  • PRODUCER_CLIENT_ID_SEQUENCE:clientld的生成器,如果没有明确指定client的Id,则使用字段生成一个ID。
  • clientld:Consumer的唯一标示。
  • coordinator:控制着Consumer与服务端GroupCoordinator之间的通信逻辑,可以将其理解成Consumer与服务端GroupCoordinator通信的门面。
  • keyDeserializer和valueDeserializer:key反序列化器和value反序列化器。
  • fetcher:负责从服务端获取消息。
  • interceptors:Consumerlnterceptor集合,ConsumerInterceptor.onConsumer()方法可以在消息通过poll()方法返回给用户之前对其进行拦截或修改;ConsumerInterceptor.onCommit()方法也可以在服务端返回提交offset成功的响应时对其进行拦截或修改。
  • client:负责消费者与Kafka服务端的网络通信。
  • subscriptions:维护了消费者的消费状态。
  • metadata:记录了整个Kafka集群的元信息。
  • currentThread和refcount:分别记录了当前使用KafkaConsumer的线程Id和重入次数,KafkaConsumer的acquire()方法和release()方法实现了一个“轻量级锁”,它并非真正的锁,仅是检测是否有多线程并发操作KafkaConsumer而已。

在后面的分析过程中,我们会逐个分析KafkaConsumer依赖的组件的功能和实现。

相关文章:

Kafka-消费者-KafkaConsumer分析

与KafkaProducer不同的是,KafkaConsumer不是一个线程安全的类。 为了便于分析,我们认为下面介绍的所有操作都是在同一线程中完成的,所以不需要考虑锁的问题。 这种设计将实现多线程处理消息的逻辑转移到了调用KafkaConsumer的代码中&#x…...

Spring | Spring中的Bean--下

Spring中的Bean: 4.Bean的生命周期5.Bean的配装配式 ( 添加Bean到IOC容器的方式 依赖注入的方式 )5.1 基于XML的配置5.2 基于Annotation (注解) 的装配 (更常用)5.3 自动装配 4.Bean的生命周期 Spring容器可以管理 singleton作用域的Bean的生命周期,在此…...

本周五上海见 第二届证券基金行业先进计算技术大会暨2024低时延技术创新实践论坛(上海站)即将召开

低时延技术是证券基金期货领域业务系统的核心技术,是打造极速交易系统领先优势的关键,也是证券基金行业关注的前沿技术热点。 1月19日下午,第二届证券基金行业先进计算技术大会暨2024低时延技术创新实践论坛(上海站)即…...

怎么安装IK分词器

.安装IK分词器 1.在线安装ik插件(较慢) # 进入容器内部 docker exec -it elasticsearch /bin/bash ​ # 在线下载并安装 ./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.12.1/elastics…...

【踩坑】flask_uploads报错cannot import name ‘secure_filename‘

转载请注明出处:小锋学长生活大爆炸[xfxuezhang.cn] 背景说明 截至目前,用新版的flask实现文件上传(用到flask_uploads库),会出现这个问题。 问题原因 版本问题,新的werkzeug已经把secure_filename的位置改了。 解决方法 手动修改…...

AI编程可视化Java项目拆解第一弹,解析本地Java项目

之前分享过一篇使用 AI 可视化 Java 项目的文章,同步在 AI 破局星球、知乎、掘金等地方都分享了。 原文在这里AI 编程:可视化 Java 项目 有很多人感兴趣,我打算写一个系列文章拆解这个项目,大家多多点赞支持~ 今天分享的是第一…...

使用arcgis pro是类似的控件样式 WPF

1.资源加载 <controls:ProWindow.Resources><ResourceDictionary><ResourceDictionary.MergedDictionaries><extensions:DesignOnlyResourceDictionary Source"pack://application:,,,/ArcGIS.Desktop.Framework;component\Themes\Default.xaml&quo…...

C语言所有字符串函数举例如何使用

strcpy: 将一个字符串复制到另一个字符串中 char source[] "Hello"; char destination[10]; strcpy(destination, source);strcat: 将一个字符串连接到另一个字符串的末尾 char str1[20] "Hello"; char str2[] "World"; strcat(str1, str2)…...

ArcGIS Pro 如何新建布局

你是否已经习惯了在ArcGIS中数据视图和布局视图之间来回切换&#xff0c;到了ArcGIS Pro中却找不到二者之间切换的按钮&#xff0c;即使新建布局后却发现地图怎么却是一片空白。 这一切的一切都是因为ArcGIS Pro的功能框架完全不同&#xff0c;这里为大家介绍一下在ArcGIS Pro…...

如何解决态势感知中的“时隐时现”问题

解决态势感知中的“时隐时现”问题有以下几个方法&#xff1a; 1、确保所有关键的监控设备和传感器正常运行&#xff0c;能够及时和准确地检测到各种异常情况。 2、引入先进的技术手段。例如使用人工智能和机器学习算法来识别和分析大量的数据&#xff0c;快速发现异常和威胁&a…...

为什么JavaScript中0.1 + 0.2 ≠ 0.3

JavaScript中的浮点数运算有时候会出现一点偏差。下面解释为什么0.1 0.2 ≠ 0.3,以及如果你需要精确运算应该怎么做。 如果1 2 3,那么为什么在JavaScript中0.1 0.2 ≠ 0.3?这个原因与计算机科学和浮点数运算有关。 我建议你打开浏览器的控制台,输入0.1 0.2来查看结果。…...

Unity关于纹理图片格式带来的内存问题和对预制体批量格式和大小减半处理

我们经常会遇到内存问题&#xff0c;这次就是遇到很多图片的默认格式被改成了RGB32&#xff0c;导致Android打包后运行内存明显增加。 发生了什么 打包Android后&#xff0c;发现经常崩溃&#xff0c;明显内存可能除了问题&#xff0c;看了内存后发现了问题。 见下图&#xf…...

2024美赛数学建模思路 - 案例:ID3-决策树分类算法

文章目录 0 赛题思路1 算法介绍2 FP树表示法3 构建FP树4 实现代码 建模资料 0 赛题思路 &#xff08;赛题出来以后第一时间在CSDN分享&#xff09; https://blog.csdn.net/dc_sinor?typeblog 1 算法介绍 FP-Tree算法全称是FrequentPattern Tree算法&#xff0c;就是频繁模…...

GitHub图床搭建

1 准备Github账号 如果没有Github账号需要先在官网注册一个账号 2 创建仓库 在github上创建一个仓库&#xff0c;随便一个普通的仓库就行&#xff0c;选择公共仓库 并且配置github仓库的pages&#xff0c;选择默认访问的分支及默认路径 3 github token获取 github token创…...

DQN、Double DQN、Dueling DQN、Per DQN、NoisyDQN 学习笔记

文章目录 DQN (Deep Q-Network)说明伪代码应用范围 Double DQN说明伪代码应用范围 Dueling DQN实现原理应用范围伪代码 Per DQN (Prioritized Experience Replay DQN)应用范围伪代码 NoisyDQN伪代码应用范围 部分内容与图片摘自&#xff1a;JoyRL 、 EasyRL DQN (Deep Q-Networ…...

C++ 编程需要什么样的开发环境?

C 编程需要什么样的开发环境&#xff1f; 在开始前我有一些资料&#xff0c;是我根据网友给的问题精心整理了一份「C的资料从专业入门到高级教程」&#xff0c; 点个关注在评论区回复“888”之后私信回复“888”&#xff0c;全部无偿共享给大家&#xff01;&#xff01;&#…...

Unity文字游戏开发日志(1)—— 打字机效果

作者是一名OIer,因为兴趣&#xff0c;想在寒假期间开发一款文字游戏的demo。 本博客仅用作记录&#xff0c;马蜂极度不符合规范。 但是&#xff0c;可以用来避坑。 1.等待功能——使用的是协程函数&#xff0c;且调用与常规调用函数不同。 private IEnumerator Sco(){isScoe…...

从0开始python学习-48.pytest框架之断言

目录 1. 响应进行断言 1.1 在yaml用例中写入断言内容 1.2 封装断言方法 1.3 在执行流程中加入断言判断内容 2. 数据库数据断言 2.1 在yaml用例中写入断言内容 2.2 连接数据库并封装执行sql的方法 2.3 封装后校验方法是否可执行 2.4 使用之前封装的断言方法&#xff0c…...

学习JavaEE的日子 day13补 深入类加载机制及底层

深入类加载机制 初识类加载过程 使用某个类时&#xff0c;如果该类的class文件没有加载到内存时&#xff0c;则系统会通过以下三个步骤来对该类进行初始化 1.类的加载&#xff08;Load&#xff09; → 2.类的连接&#xff08;Link&#xff09; → 3.类的初始化&#xff08;In…...

C# WebApi传参及Postman调试

概述 欢迎来到本文&#xff0c;本篇文章将会探讨C# WebApi中传递参数的方法。在WebApi中&#xff0c;参数传递是一个非常重要的概念&#xff0c;因为它使得我们能够从客户端获取数据&#xff0c;并将数据传递到服务器端进行处理。WebApi是一种使用HTTP协议进行通信的RESTful服…...

Chapter03-Authentication vulnerabilities

文章目录 1. 身份验证简介1.1 What is authentication1.2 difference between authentication and authorization1.3 身份验证机制失效的原因1.4 身份验证机制失效的影响 2. 基于登录功能的漏洞2.1 密码爆破2.2 用户名枚举2.3 有缺陷的暴力破解防护2.3.1 如果用户登录尝试失败次…...

React 第五十五节 Router 中 useAsyncError的使用详解

前言 useAsyncError 是 React Router v6.4 引入的一个钩子&#xff0c;用于处理异步操作&#xff08;如数据加载&#xff09;中的错误。下面我将详细解释其用途并提供代码示例。 一、useAsyncError 用途 处理异步错误&#xff1a;捕获在 loader 或 action 中发生的异步错误替…...

Java 语言特性(面试系列1)

一、面向对象编程 1. 封装&#xff08;Encapsulation&#xff09; 定义&#xff1a;将数据&#xff08;属性&#xff09;和操作数据的方法绑定在一起&#xff0c;通过访问控制符&#xff08;private、protected、public&#xff09;隐藏内部实现细节。示例&#xff1a; public …...

大语言模型如何处理长文本?常用文本分割技术详解

为什么需要文本分割? 引言:为什么需要文本分割?一、基础文本分割方法1. 按段落分割(Paragraph Splitting)2. 按句子分割(Sentence Splitting)二、高级文本分割策略3. 重叠分割(Sliding Window)4. 递归分割(Recursive Splitting)三、生产级工具推荐5. 使用LangChain的…...

企业如何增强终端安全?

在数字化转型加速的今天&#xff0c;企业的业务运行越来越依赖于终端设备。从员工的笔记本电脑、智能手机&#xff0c;到工厂里的物联网设备、智能传感器&#xff0c;这些终端构成了企业与外部世界连接的 “神经末梢”。然而&#xff0c;随着远程办公的常态化和设备接入的爆炸式…...

微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据

微软PowerBI考试 PL300-在 Power BI 中清理、转换和加载数据 Power Query 具有大量专门帮助您清理和准备数据以供分析的功能。 您将了解如何简化复杂模型、更改数据类型、重命名对象和透视数据。 您还将了解如何分析列&#xff0c;以便知晓哪些列包含有价值的数据&#xff0c;…...

基于SpringBoot在线拍卖系统的设计和实现

摘 要 随着社会的发展&#xff0c;社会的各行各业都在利用信息化时代的优势。计算机的优势和普及使得各种信息系统的开发成为必需。 在线拍卖系统&#xff0c;主要的模块包括管理员&#xff1b;首页、个人中心、用户管理、商品类型管理、拍卖商品管理、历史竞拍管理、竞拍订单…...

RSS 2025|从说明书学习复杂机器人操作任务:NUS邵林团队提出全新机器人装配技能学习框架Manual2Skill

视觉语言模型&#xff08;Vision-Language Models, VLMs&#xff09;&#xff0c;为真实环境中的机器人操作任务提供了极具潜力的解决方案。 尽管 VLMs 取得了显著进展&#xff0c;机器人仍难以胜任复杂的长时程任务&#xff08;如家具装配&#xff09;&#xff0c;主要受限于人…...

Redis:现代应用开发的高效内存数据存储利器

一、Redis的起源与发展 Redis最初由意大利程序员Salvatore Sanfilippo在2009年开发&#xff0c;其初衷是为了满足他自己的一个项目需求&#xff0c;即需要一个高性能的键值存储系统来解决传统数据库在高并发场景下的性能瓶颈。随着项目的开源&#xff0c;Redis凭借其简单易用、…...

深入浅出Diffusion模型:从原理到实践的全方位教程

I. 引言&#xff1a;生成式AI的黎明 – Diffusion模型是什么&#xff1f; 近年来&#xff0c;生成式人工智能&#xff08;Generative AI&#xff09;领域取得了爆炸性的进展&#xff0c;模型能够根据简单的文本提示创作出逼真的图像、连贯的文本&#xff0c;乃至更多令人惊叹的…...