Kafka消费者组
消费者总体工作流程

Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。
• 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。
• 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

消费者组初始化流程

1、coordinator:辅助实现消费者组的初始化和分区的分配。 coordinator节点选择 = groupid的hashcode值 % 50( __consumer_offsets的分区数量) 例如: groupid的hashcode值 = 1,1% 50 = 1,那么__consumer_offsets 主题的1号分区,在哪个broker上,就选择这个节点的coordinator 作为这个消费者组的老大。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset;
2、coordinator选出一个 consumer作为leader;
3、coordinator把要消费的topic情况发送给leader消费者;
4、leader会负责制定消费方案;
5、把消费方案发给coordinator;
6、Coordinator就把消费方 案下发给各个consumer;
7、每个消费者都会和coordinator保持心跳(默认3s),一旦超时 (session.timeout.ms=45s),该消费者会被移除,并触发再平衡; 或者消费者处理消息的时间过长(max.poll.interval.ms5分钟),也会触发再平衡
消费者组详细消费流程

左侧为Kafka集群,右侧为消费者组,消费者创建网络连接客户端,消费者组调用sendFetches,抓取数据,同时还会准备两个参数,Fetch.min.bytes:每批次最小抓取大小,默认1字节,fetch.max.wait.ms一批数据最小值未达到的超时时间,默认500ms,任一条件满足,都会拉取数据;Fetch.max.bytes每批次最 大抓取大小,默认50m
send->拉取数据将数据放进completedFetches队列,消费者一批次拉取默认500条进行处理:反序列化->拦截器->处理数据
package com.atguigu.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumer {public static void main(String[] args) {//配置Properties properties = new Properties();//链接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");//反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//配置消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");//1.创建消费者KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);//2。订阅主题ArrayList<String> topics = new ArrayList<>();topics.add("first");kafkaConsumer.subscribe(topics);//3.消费数据while(true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));//拉数据for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}

相关文章:
Kafka消费者组
消费者总体工作流程 Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。 • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费…...
四. 基于环视Camera的BEV感知算法-BEVDepth
目录 前言0. 简述1. 算法动机&开创性思路2. 主体结构3. 损失函数4. 性能对比总结下载链接参考 前言 自动驾驶之心推出的《国内首个BVE感知全栈系列学习教程》,链接。记录下个人学习笔记,仅供自己参考 本次课程我们来学习下课程第四章——基于环视Cam…...
CentOS系统环境搭建(二十五)——使用docker compose安装mysql
centos系统环境搭建专栏🔗点击跳转 文章目录 使用docker compose安装mysqlMySQL81.新建文件夹2.创建docker-compose.yaml3.创建my.cnf4.mysql容器的启动和关闭 MySQL5.71.新建文件夹2.创建docker-compose.yaml3.创建my.cnf4.mysql容器的启动和关闭 使用docker comp…...
协作机器人(Collaborative-Robot)安全碰撞的速度与接触力
协作机器人(Collaborative-Robot)的安全碰撞速度和接触力是一个非常重要的安全指标。在设计和使用协作机器人时,必须确保其与人类或其他物体的碰撞不会对人员造成伤害。 对于协作机器人的安全碰撞速度,一般会设定一个上限值&…...
第11章 GUI Page400~402 步骤二 画直线
运行效果: 源代码: /**************************************************************** Name: wxMyPainterApp.h* Purpose: Defines Application Class* Author: yanzhenxi (3065598272qq.com)* Created: 2023-12-21* Copyright: yanzhen…...
华为gre隧道全部跑静态路由
最终实现: 1、pc1能用nat上网ping能pc3 2、pc1能通过gre访问pc2 3、全部用静态路由做,没有用ospf,如果要用ospf,那么两边除了路由器上跑ospf,核心交换机也得用ospf r2配置: acl number 3000 rule 5 deny…...
【c++】入门1
c关键字 命名空间 在C/C中,变量、函数和后面要学到的类都是大量存在的,这些变量、函数和类的名称将都存在于全局作用域中,可能会导致很多冲突。使用命名空间的目的是对标识符的名称进行本地化,以避免命名冲突或名字污染ÿ…...
Python之Django项目的功能配置
1.创建Django项目 进入项目管理目录,比如:D盘 执行命令:diango-admin startproject demo1 创建项目 如果提示diango命令不存在,搜索diango-admin程序的位置,然后加入到环境变量path中。 进入项目,cd demo…...
P4 音频知识点——PCM音频原始数据
目录 前言 01 PCM音频原始数据 1.1 频率 1.2 振幅: 1.3 比特率 1.4 采样 1.5 量化 1.6 编码 02. PCM数据有以下重要的参数: 采样率: 采集深度 通道数 PCM比特率 PCM文件大小计算: …...
解决Electron中WebView加载部分HTTPS页面白屏的方法
Electron是一个开源的桌面应用程序框架,它允许使用Web技术构建跨平台的桌面应用。在Electron应用中,WebView 是一个常用的组件,用于嵌套加载Web内容。然而,有时候在加载使用 HTTPS 协议的页面时,可能会因为证书问题导致…...
【Java中创建对象的方式有哪些?】
✅Java中创建对象的方式有哪些? ✅使用New关键字✅使用反射机制✅使用clone方法✅使用反序列化✅使用方法句柄✅ 使用Unsafe分配内存 ✅使用New关键字 这是我们最常见的也是最简单的创建对象的方式,通过这种方式我们还可以调用任意的构造函数 (无参的和有…...
npm使用详解(好吧好吧是粗解)
目录 npm是什么? npm有什么用? npm安装 在 Windows 上 在 macOS 上 在 Linux 上(使用 apt 包管理器为例) 验证 npm 安装成功: npm使用 1. 初始化项目: 2. 安装和管理依赖: 3. 查看和…...
uniapp自定义头部导航怎么实现?
一、在pages.json文件里边写上自定义属性 "navigationStyle": "custom" 二、在对应的index页面写上以下: <view :style"{ height: headheight px, backgroundColor: #24B7FF, zIndex: 99, position: fixed, top: 0px, width: 100% …...
什么是 Dubbo?它有哪些核心功能?
文章目录 什么是 Dubbo?它有哪些核心功能? 什么是 Dubbo?它有哪些核心功能? Dubbo 是一款高性能、轻量级的开源 RPC 框架。由 10 层模式构成,整个分层依赖由上至下。 通过这张图我们也可以将 Dubbo 理解为三层模式&…...
(2021|CoRR,AugCLIP,优化)FuseDream:通过改进的 CLIP+GAN 空间优化实现免训练文本到图像生成
FuseDream: Training-Free Text-to-Image Generation with Improved CLIPGAN Space Optimization 公众:EDPJ(添加 VX:CV_EDPJ 或直接进 Q 交流群:922230617 获取资料) 目录 0. 摘要 1. 简介 2. CLIPGAN 文本到图…...
python pip安装依赖的常用软件源
目录 引言 一、什么是镜像源? 二、清华源 三、阿里源 四、中科大源 五、豆瓣源 六、更多资源 引言 在软件开发和使用过程中,我们经常需要下载和更新各种软件包和库文件。然而,由于网络环境的限制或者服务器的负载&#…...
避免大M取值过大引起的数值问题
在数学建模当中,常常会见到大M法,它之所以叫大M法,是因为它涉及到一个(绝对值)较大的系数M,这个大M的值应大于约束中的连续变量或者约束表达式可能取到的任何合理值,M值取过大往往会造成优化问题…...
史密斯圆图的使用
史密斯圆图的使用 简介识别史密斯圆图等反射系数圆归一化阻抗圆导纳圆图史密斯圆图的使用单支匹配双支匹配简介 史密斯图Smith Chart是电气工程,无线电,射频工程,微波工程和通信等领域常用的一种图示工具,用于分析和设计传输线和阻抗匹配网络,它由美国工程师Phillip H.Sm…...
可重复读解决了哪些问题? 对 SQL 慢查询会考虑哪些优化 ?
文章目录 可重复读解决了哪些问题?对 SQL 慢查询会考虑哪些优化 ? 可重复读解决了哪些问题? (1)可重复读的核心就是一致性读(consistent read);保证多次读取同一个数据时,其值都和事务开始时候的内容是一致…...
从0开始python学习-35.allure报告企业定制
目录 1. 搭建allure环境 2. 生成报告 3. logo定制 4. 企业级报告内容或层级定制 5. allure局域网查看 1. 搭建allure环境 1.1 JDK,使用PyCharm 找到pycharm安装目录找到java.exe记下jbr目录的完整路径,eg: C:\Program Files\JetBrains\PyCharm Com…...
DanKoe 视频笔记:写作技能:掌握写作,驾驭未来十年
概述 在本节课中,我们将要学习为什么写作是未来十年最重要的元技能,以及如何通过一个清晰的六步框架和一套实用的写作方法,开启你的个人写作事业。我们将探讨写作如何放大你的其他技能,并为你提供一套从零开始构建影响力的具体行…...
基于GPT-5.4的本科毕业论文智能写作实战指南:从实验数据到完稿的全流程教程
摘要: 对于已完成实验并手握参考文献的大四学生而言,将 months of experiments 转化为符合学术规范的毕业论文往往是最具挑战性的环节。本教程系统介绍如何利用GPT-5.4这一先进的大语言模型,通过科学的提示词工程(Prompt Engineer…...
AI视频生成工具ComfyUI-WanVideoWrapper零基础配置指南
AI视频生成工具ComfyUI-WanVideoWrapper零基础配置指南 【免费下载链接】ComfyUI-WanVideoWrapper 项目地址: https://gitcode.com/GitHub_Trending/co/ComfyUI-WanVideoWrapper 还在为视频生成工具的复杂配置烦恼?想快速掌握AI视频创作却被技术门槛劝退&am…...
DNF联机搭建避坑指南:从‘花枝登录器’授权到PVF加密的全流程解析
DNF私服联机搭建实战:从授权配置到加密通信的完整解决方案 当几个朋友想搭建一个私人DNF服务器享受联机乐趣时,最令人头疼的往往不是服务端的启动,而是如何让客户端顺利连接。本文将聚焦于那些让"单机变联机"的关键技术环节——登录…...
WPF颜色转换器实战:如何用ConverterParameter动态切换UI主题色(附完整代码)
WPF颜色转换器实战:如何用ConverterParameter动态切换UI主题色(附完整代码) 在WPF应用开发中,动态主题切换是提升用户体验的关键功能之一。想象一下,你的应用能够根据用户偏好或系统设置实时切换明暗主题,甚…...
轻量NAS整合:OpenClaw+nanobot自动同步群晖文件的配置方法
轻量NAS整合:OpenClawnanobot自动同步群晖文件的配置方法 1. 为什么需要自动化文件管理 作为一个长期使用群晖NAS的用户,我经常遇到这样的困扰:下载文件夹里堆满了各种文件,手动分类整理耗时耗力;重要文档的版本管理…...
新手必看!用Python+OpenCV实现简易版视觉里程计(附车道线检测代码)
PythonOpenCV实战:从车道线检测到简易视觉里程计 在自动驾驶和机器人导航领域,视觉里程计(VO)是一项基础而关键的技术。它像是一双"数字眼睛",通过分析连续图像帧之间的变化来估算设备的运动轨迹。想象一下,当你闭着眼…...
从国赛真题到实战演练:蓝桥杯CTF网络安全竞赛核心题型深度剖析
1. 逆向工程实战:从加密程序到Flag还原 去年蓝桥杯CTF国赛的第一道逆向题让不少选手印象深刻。题目给出一个名为encodefile的可执行程序和一个加密后的数据文件enc.dat,要求还原原始flag内容。这类题型在CTF中非常典型,主要考察选手对程序逻辑…...
Windows 系统下通过 composer 快速搭建 ThinkPHP6 开发环境及实战配置指南
1. 环境准备:Windows下搭建ThinkPHP6的基础条件 在Windows系统下搭建ThinkPHP6开发环境,首先需要确保基础软件栈的完整性。我遇到过不少新手开发者直接跳过了环境检查环节,结果在后续步骤中频繁报错。这里分享几个必须提前准备好的关键组件&a…...
comsol仿真建模 由于结构本身的复杂性,很难对实际多孔结构中的流动进行建模。 在实际应用中...
comsol仿真建模 由于结构本身的复杂性,很难对实际多孔结构中的流动进行建模。 在实际应用中,详细求解流场不可行。 因此,使用了利用多孔结构平均物理量 (如孔隙率和渗透率)的宏观方法。 本例详细分析孔隙尺度的流场&am…...
