Kafka-消费者-KafkaConsumer分析
与KafkaProducer不同的是,KafkaConsumer不是一个线程安全的类。
为了便于分析,我们认为下面介绍的所有操作都是在同一线程中完成的,所以不需要考虑锁的问题。
这种设计将实现多线程处理消息的逻辑转移到了调用KafkaConsumer的代码中,可以根据业务逻辑使用不同的实现方式。
例如,可以使用“线程封闭”的方式,每个业务线程拥有一个KafkaConsumer对象,这种方式实现简单、快速。
还可以使用两个线程池实现“生产者—消费者”模式,解耦消息消费和消息处理的逻辑。
其中一个线程池中每个线程拥有一个KafkaConsumer对象,负责从Kafka集群拉取消息,然后将消息放入队列中缓存,而另一个线程池中的线程负责从队列中获取消息,执行处理消息的业务逻辑。
下面开始对KafkaConsumer的分析。
KafkaConsumer实现了Consumer接口,Consumer接口中定义了KafkaConsumer对外的API,其核心方法可以分为下面六类。
- subscribe()方法:订阅指定的Topic,并为消费者自动分配分区。
- assign()方法:用户手动订阅指定的Topic,并且指定消费的分区。此方法与subscribe()方法互斥。
- commit*()方法:提交消费者已经消费完成的offset。
- seek*()方法:指定消费者起始消费的位置。
- poll()方法:负责从服务端获取消息。
- pause()、resume()方法:暂停/继续Consumer,暂停后poll方法会返回空。
了解了Consumer接口定义的功能之后,我们下面就来分析KafkaConsumer的具体实现。首先,我们需要了解KafkaConsumer中重要的字段,如图所示。
- PRODUCER_CLIENT_ID_SEQUENCE:clientld的生成器,如果没有明确指定client的Id,则使用字段生成一个ID。
- clientld:Consumer的唯一标示。
- coordinator:控制着Consumer与服务端GroupCoordinator之间的通信逻辑,可以将其理解成Consumer与服务端GroupCoordinator通信的门面。
- keyDeserializer和valueDeserializer:key反序列化器和value反序列化器。
- fetcher:负责从服务端获取消息。
- interceptors:Consumerlnterceptor集合,ConsumerInterceptor.onConsumer()方法可以在消息通过poll()方法返回给用户之前对其进行拦截或修改;ConsumerInterceptor.onCommit()方法也可以在服务端返回提交offset成功的响应时对其进行拦截或修改。
- client:负责消费者与Kafka服务端的网络通信。
- subscriptions:维护了消费者的消费状态。
- metadata:记录了整个Kafka集群的元信息。
- currentThread和refcount:分别记录了当前使用KafkaConsumer的线程Id和重入次数,KafkaConsumer的acquire()方法和release()方法实现了一个“轻量级锁”,它并非真正的锁,仅是检测是否有多线程并发操作KafkaConsumer而已。
在后面的分析过程中,我们会逐个分析KafkaConsumer依赖的组件的功能和实现。
相关文章:

Kafka-消费者-KafkaConsumer分析
与KafkaProducer不同的是,KafkaConsumer不是一个线程安全的类。 为了便于分析,我们认为下面介绍的所有操作都是在同一线程中完成的,所以不需要考虑锁的问题。 这种设计将实现多线程处理消息的逻辑转移到了调用KafkaConsumer的代码中&#x…...

Spring | Spring中的Bean--下
Spring中的Bean: 4.Bean的生命周期5.Bean的配装配式 ( 添加Bean到IOC容器的方式 依赖注入的方式 )5.1 基于XML的配置5.2 基于Annotation (注解) 的装配 (更常用)5.3 自动装配 4.Bean的生命周期 Spring容器可以管理 singleton作用域的Bean的生命周期,在此…...

本周五上海见 第二届证券基金行业先进计算技术大会暨2024低时延技术创新实践论坛(上海站)即将召开
低时延技术是证券基金期货领域业务系统的核心技术,是打造极速交易系统领先优势的关键,也是证券基金行业关注的前沿技术热点。 1月19日下午,第二届证券基金行业先进计算技术大会暨2024低时延技术创新实践论坛(上海站)即…...

怎么安装IK分词器
.安装IK分词器 1.在线安装ik插件(较慢) # 进入容器内部 docker exec -it elasticsearch /bin/bash # 在线下载并安装 ./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.12.1/elastics…...

【踩坑】flask_uploads报错cannot import name ‘secure_filename‘
转载请注明出处:小锋学长生活大爆炸[xfxuezhang.cn] 背景说明 截至目前,用新版的flask实现文件上传(用到flask_uploads库),会出现这个问题。 问题原因 版本问题,新的werkzeug已经把secure_filename的位置改了。 解决方法 手动修改…...

AI编程可视化Java项目拆解第一弹,解析本地Java项目
之前分享过一篇使用 AI 可视化 Java 项目的文章,同步在 AI 破局星球、知乎、掘金等地方都分享了。 原文在这里AI 编程:可视化 Java 项目 有很多人感兴趣,我打算写一个系列文章拆解这个项目,大家多多点赞支持~ 今天分享的是第一…...

使用arcgis pro是类似的控件样式 WPF
1.资源加载 <controls:ProWindow.Resources><ResourceDictionary><ResourceDictionary.MergedDictionaries><extensions:DesignOnlyResourceDictionary Source"pack://application:,,,/ArcGIS.Desktop.Framework;component\Themes\Default.xaml&quo…...
C语言所有字符串函数举例如何使用
strcpy: 将一个字符串复制到另一个字符串中 char source[] "Hello"; char destination[10]; strcpy(destination, source);strcat: 将一个字符串连接到另一个字符串的末尾 char str1[20] "Hello"; char str2[] "World"; strcat(str1, str2)…...

ArcGIS Pro 如何新建布局
你是否已经习惯了在ArcGIS中数据视图和布局视图之间来回切换,到了ArcGIS Pro中却找不到二者之间切换的按钮,即使新建布局后却发现地图怎么却是一片空白。 这一切的一切都是因为ArcGIS Pro的功能框架完全不同,这里为大家介绍一下在ArcGIS Pro…...
如何解决态势感知中的“时隐时现”问题
解决态势感知中的“时隐时现”问题有以下几个方法: 1、确保所有关键的监控设备和传感器正常运行,能够及时和准确地检测到各种异常情况。 2、引入先进的技术手段。例如使用人工智能和机器学习算法来识别和分析大量的数据,快速发现异常和威胁&a…...

为什么JavaScript中0.1 + 0.2 ≠ 0.3
JavaScript中的浮点数运算有时候会出现一点偏差。下面解释为什么0.1 0.2 ≠ 0.3,以及如果你需要精确运算应该怎么做。 如果1 2 3,那么为什么在JavaScript中0.1 0.2 ≠ 0.3?这个原因与计算机科学和浮点数运算有关。 我建议你打开浏览器的控制台,输入0.1 0.2来查看结果。…...

Unity关于纹理图片格式带来的内存问题和对预制体批量格式和大小减半处理
我们经常会遇到内存问题,这次就是遇到很多图片的默认格式被改成了RGB32,导致Android打包后运行内存明显增加。 发生了什么 打包Android后,发现经常崩溃,明显内存可能除了问题,看了内存后发现了问题。 见下图…...

2024美赛数学建模思路 - 案例:ID3-决策树分类算法
文章目录 0 赛题思路1 算法介绍2 FP树表示法3 构建FP树4 实现代码 建模资料 0 赛题思路 (赛题出来以后第一时间在CSDN分享) https://blog.csdn.net/dc_sinor?typeblog 1 算法介绍 FP-Tree算法全称是FrequentPattern Tree算法,就是频繁模…...

GitHub图床搭建
1 准备Github账号 如果没有Github账号需要先在官网注册一个账号 2 创建仓库 在github上创建一个仓库,随便一个普通的仓库就行,选择公共仓库 并且配置github仓库的pages,选择默认访问的分支及默认路径 3 github token获取 github token创…...

DQN、Double DQN、Dueling DQN、Per DQN、NoisyDQN 学习笔记
文章目录 DQN (Deep Q-Network)说明伪代码应用范围 Double DQN说明伪代码应用范围 Dueling DQN实现原理应用范围伪代码 Per DQN (Prioritized Experience Replay DQN)应用范围伪代码 NoisyDQN伪代码应用范围 部分内容与图片摘自:JoyRL 、 EasyRL DQN (Deep Q-Networ…...

C++ 编程需要什么样的开发环境?
C 编程需要什么样的开发环境? 在开始前我有一些资料,是我根据网友给的问题精心整理了一份「C的资料从专业入门到高级教程」, 点个关注在评论区回复“888”之后私信回复“888”,全部无偿共享给大家!!&#…...
Unity文字游戏开发日志(1)—— 打字机效果
作者是一名OIer,因为兴趣,想在寒假期间开发一款文字游戏的demo。 本博客仅用作记录,马蜂极度不符合规范。 但是,可以用来避坑。 1.等待功能——使用的是协程函数,且调用与常规调用函数不同。 private IEnumerator Sco(){isScoe…...

从0开始python学习-48.pytest框架之断言
目录 1. 响应进行断言 1.1 在yaml用例中写入断言内容 1.2 封装断言方法 1.3 在执行流程中加入断言判断内容 2. 数据库数据断言 2.1 在yaml用例中写入断言内容 2.2 连接数据库并封装执行sql的方法 2.3 封装后校验方法是否可执行 2.4 使用之前封装的断言方法,…...

学习JavaEE的日子 day13补 深入类加载机制及底层
深入类加载机制 初识类加载过程 使用某个类时,如果该类的class文件没有加载到内存时,则系统会通过以下三个步骤来对该类进行初始化 1.类的加载(Load) → 2.类的连接(Link) → 3.类的初始化(In…...

C# WebApi传参及Postman调试
概述 欢迎来到本文,本篇文章将会探讨C# WebApi中传递参数的方法。在WebApi中,参数传递是一个非常重要的概念,因为它使得我们能够从客户端获取数据,并将数据传递到服务器端进行处理。WebApi是一种使用HTTP协议进行通信的RESTful服…...

测试微信模版消息推送
进入“开发接口管理”--“公众平台测试账号”,无需申请公众账号、可在测试账号中体验并测试微信公众平台所有高级接口。 获取access_token: 自定义模版消息: 关注测试号:扫二维码关注测试号。 发送模版消息: import requests da…...

深入浅出Asp.Net Core MVC应用开发系列-AspNetCore中的日志记录
ASP.NET Core 是一个跨平台的开源框架,用于在 Windows、macOS 或 Linux 上生成基于云的新式 Web 应用。 ASP.NET Core 中的日志记录 .NET 通过 ILogger API 支持高性能结构化日志记录,以帮助监视应用程序行为和诊断问题。 可以通过配置不同的记录提供程…...
React Native 开发环境搭建(全平台详解)
React Native 开发环境搭建(全平台详解) 在开始使用 React Native 开发移动应用之前,正确设置开发环境是至关重要的一步。本文将为你提供一份全面的指南,涵盖 macOS 和 Windows 平台的配置步骤,如何在 Android 和 iOS…...

STM32F4基本定时器使用和原理详解
STM32F4基本定时器使用和原理详解 前言如何确定定时器挂载在哪条时钟线上配置及使用方法参数配置PrescalerCounter ModeCounter Periodauto-reload preloadTrigger Event Selection 中断配置生成的代码及使用方法初始化代码基本定时器触发DCA或者ADC的代码讲解中断代码定时启动…...
电脑插入多块移动硬盘后经常出现卡顿和蓝屏
当电脑在插入多块移动硬盘后频繁出现卡顿和蓝屏问题时,可能涉及硬件资源冲突、驱动兼容性、供电不足或系统设置等多方面原因。以下是逐步排查和解决方案: 1. 检查电源供电问题 问题原因:多块移动硬盘同时运行可能导致USB接口供电不足&#x…...
spring:实例工厂方法获取bean
spring处理使用静态工厂方法获取bean实例,也可以通过实例工厂方法获取bean实例。 实例工厂方法步骤如下: 定义实例工厂类(Java代码),定义实例工厂(xml),定义调用实例工厂ÿ…...

新能源汽车智慧充电桩管理方案:新能源充电桩散热问题及消防安全监管方案
随着新能源汽车的快速普及,充电桩作为核心配套设施,其安全性与可靠性备受关注。然而,在高温、高负荷运行环境下,充电桩的散热问题与消防安全隐患日益凸显,成为制约行业发展的关键瓶颈。 如何通过智慧化管理手段优化散…...
linux 下常用变更-8
1、删除普通用户 查询用户初始UID和GIDls -l /home/ ###家目录中查看UID cat /etc/group ###此文件查看GID删除用户1.编辑文件 /etc/passwd 找到对应的行,YW343:x:0:0::/home/YW343:/bin/bash 2.将标红的位置修改为用户对应初始UID和GID: YW3…...

2025盘古石杯决赛【手机取证】
前言 第三届盘古石杯国际电子数据取证大赛决赛 最后一题没有解出来,实在找不到,希望有大佬教一下我。 还有就会议时间,我感觉不是图片时间,因为在电脑看到是其他时间用老会议系统开的会。 手机取证 1、分析鸿蒙手机检材&#x…...

2025年渗透测试面试题总结-腾讯[实习]科恩实验室-安全工程师(题目+回答)
安全领域各种资源,学习文档,以及工具分享、前沿信息分享、POC、EXP分享。不定期分享各种好玩的项目及好用的工具,欢迎关注。 目录 腾讯[实习]科恩实验室-安全工程师 一、网络与协议 1. TCP三次握手 2. SYN扫描原理 3. HTTPS证书机制 二…...