当前位置: 首页 > news >正文

Kafka消费者组

消费者总体工作流程

        Consumer Group(CG):消费者组,由多个consumer组成。形成一个消费者组的条件,是所有消费者的groupid相同。

• 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。

• 消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

消费者组初始化流程

        1、coordinator:辅助实现消费者组的初始化和分区的分配。 coordinator节点选择 = groupid的hashcode值 % 50( __consumer_offsets的分区数量) 例如: groupid的hashcode值 = 1,1% 50 = 1,那么__consumer_offsets 主题的1号分区,在哪个broker上,就选择这个节点的coordinator 作为这个消费者组的老大。消费者组下的所有的消费者提交offset的时候就往这个分区去提交offset;

        2、coordinator选出一个 consumer作为leader;

        3、coordinator把要消费的topic情况发送给leader消费者;

        4、leader会负责制定消费方案;

        5、把消费方案发给coordinator;

        6、Coordinator就把消费方 案下发给各个consumer;

        7、每个消费者都会和coordinator保持心跳(默认3s),一旦超时 (session.timeout.ms=45s),该消费者会被移除,并触发再平衡; 或者消费者处理消息的时间过长(max.poll.interval.ms5分钟),也会触发再平衡

消费者组详细消费流程

        左侧为Kafka集群,右侧为消费者组,消费者创建网络连接客户端,消费者组调用sendFetches,抓取数据,同时还会准备两个参数,Fetch.min.bytes:每批次最小抓取大小,默认1字节,fetch.max.wait.ms一批数据最小值未达到的超时时间,默认500ms,任一条件满足,都会拉取数据;Fetch.max.bytes每批次最 大抓取大小,默认50m

        send->拉取数据将数据放进completedFetches队列,消费者一批次拉取默认500条进行处理:反序列化->拦截器->处理数据

package com.atguigu.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumer {public static void main(String[] args) {//配置Properties properties = new Properties();//链接properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");//反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//配置消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");//1.创建消费者KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);//2。订阅主题ArrayList<String> topics = new ArrayList<>();topics.add("first");kafkaConsumer.subscribe(topics);//3.消费数据while(true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));//拉数据for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}

相关文章:

Kafka消费者组

消费者总体工作流程 Consumer Group&#xff08;CG&#xff09;&#xff1a;消费者组&#xff0c;由多个consumer组成。形成一个消费者组的条件&#xff0c;是所有消费者的groupid相同。 • 消费者组内每个消费者负责消费不同分区的数据&#xff0c;一个分区只能由一个组内消费…...

四. 基于环视Camera的BEV感知算法-BEVDepth

目录 前言0. 简述1. 算法动机&开创性思路2. 主体结构3. 损失函数4. 性能对比总结下载链接参考 前言 自动驾驶之心推出的《国内首个BVE感知全栈系列学习教程》&#xff0c;链接。记录下个人学习笔记&#xff0c;仅供自己参考 本次课程我们来学习下课程第四章——基于环视Cam…...

CentOS系统环境搭建(二十五)——使用docker compose安装mysql

centos系统环境搭建专栏&#x1f517;点击跳转 文章目录 使用docker compose安装mysqlMySQL81.新建文件夹2.创建docker-compose.yaml3.创建my.cnf4.mysql容器的启动和关闭 MySQL5.71.新建文件夹2.创建docker-compose.yaml3.创建my.cnf4.mysql容器的启动和关闭 使用docker comp…...

协作机器人(Collaborative-Robot)安全碰撞的速度与接触力

协作机器人&#xff08;Collaborative-Robot&#xff09;的安全碰撞速度和接触力是一个非常重要的安全指标。在设计和使用协作机器人时&#xff0c;必须确保其与人类或其他物体的碰撞不会对人员造成伤害。 对于协作机器人的安全碰撞速度&#xff0c;一般会设定一个上限值&…...

第11章 GUI Page400~402 步骤二 画直线

运行效果&#xff1a; 源代码&#xff1a; /**************************************************************** Name: wxMyPainterApp.h* Purpose: Defines Application Class* Author: yanzhenxi (3065598272qq.com)* Created: 2023-12-21* Copyright: yanzhen…...

华为gre隧道全部跑静态路由

最终实现&#xff1a; 1、pc1能用nat上网ping能pc3 2、pc1能通过gre访问pc2 3、全部用静态路由做&#xff0c;没有用ospf&#xff0c;如果要用ospf&#xff0c;那么两边除了路由器上跑ospf&#xff0c;核心交换机也得用ospf r2配置&#xff1a; acl number 3000 rule 5 deny…...

【c++】入门1

c关键字 命名空间 在C/C中&#xff0c;变量、函数和后面要学到的类都是大量存在的&#xff0c;这些变量、函数和类的名称将都存在于全局作用域中&#xff0c;可能会导致很多冲突。使用命名空间的目的是对标识符的名称进行本地化&#xff0c;以避免命名冲突或名字污染&#xff…...

Python之Django项目的功能配置

1.创建Django项目 进入项目管理目录&#xff0c;比如&#xff1a;D盘 执行命令&#xff1a;diango-admin startproject demo1 创建项目 如果提示diango命令不存在&#xff0c;搜索diango-admin程序的位置&#xff0c;然后加入到环境变量path中。 进入项目&#xff0c;cd demo…...

P4 音频知识点——PCM音频原始数据

目录 前言 01 PCM音频原始数据 1.1 频率 1.2 振幅&#xff1a; 1.3 比特率 1.4 采样 1.5 量化 1.6 编码 02. PCM数据有以下重要的参数&#xff1a; 采样率&#xff1a; 采集深度 通道数 ​​​​​​​ PCM比特率 ​​​​​​​ PCM文件大小计算&#xff1a; ​…...

解决Electron中WebView加载部分HTTPS页面白屏的方法

Electron是一个开源的桌面应用程序框架&#xff0c;它允许使用Web技术构建跨平台的桌面应用。在Electron应用中&#xff0c;WebView 是一个常用的组件&#xff0c;用于嵌套加载Web内容。然而&#xff0c;有时候在加载使用 HTTPS 协议的页面时&#xff0c;可能会因为证书问题导致…...

【Java中创建对象的方式有哪些?】

✅Java中创建对象的方式有哪些&#xff1f; ✅使用New关键字✅使用反射机制✅使用clone方法✅使用反序列化✅使用方法句柄✅ 使用Unsafe分配内存 ✅使用New关键字 这是我们最常见的也是最简单的创建对象的方式&#xff0c;通过这种方式我们还可以调用任意的构造函数 (无参的和有…...

npm使用详解(好吧好吧是粗解)

目录 npm是什么&#xff1f; npm有什么用&#xff1f; npm安装 在 Windows 上 在 macOS 上 在 Linux 上&#xff08;使用 apt 包管理器为例&#xff09; 验证 npm 安装成功&#xff1a; npm使用 1. 初始化项目&#xff1a; 2. 安装和管理依赖&#xff1a; 3. 查看和…...

uniapp自定义头部导航怎么实现?

一、在pages.json文件里边写上自定义属性 "navigationStyle": "custom" 二、在对应的index页面写上以下&#xff1a; <view :style"{ height: headheight px, backgroundColor: #24B7FF, zIndex: 99, position: fixed, top: 0px, width: 100% …...

什么是 Dubbo?它有哪些核心功能?

文章目录 什么是 Dubbo&#xff1f;它有哪些核心功能&#xff1f; 什么是 Dubbo&#xff1f;它有哪些核心功能&#xff1f; Dubbo 是一款高性能、轻量级的开源 RPC 框架。由 10 层模式构成&#xff0c;整个分层依赖由上至下。 通过这张图我们也可以将 Dubbo 理解为三层模式&…...

(2021|CoRR,AugCLIP,优化)FuseDream:通过改进的 CLIP+GAN 空间优化实现免训练文本到图像生成

FuseDream: Training-Free Text-to-Image Generation with Improved CLIPGAN Space Optimization 公众&#xff1a;EDPJ&#xff08;添加 VX&#xff1a;CV_EDPJ 或直接进 Q 交流群&#xff1a;922230617 获取资料&#xff09; 目录 0. 摘要 1. 简介 2. CLIPGAN 文本到图…...

python pip安装依赖的常用软件源

目录 引言 一、什么是镜像源&#xff1f;​​​​​​​ 二、清华源 三、阿里源 四、中科大源 五、豆瓣源 六、更多资源 引言 在软件开发和使用过程中&#xff0c;我们经常需要下载和更新各种软件包和库文件。然而&#xff0c;由于网络环境的限制或者服务器的负载&#…...

避免大M取值过大引起的数值问题

在数学建模当中&#xff0c;常常会见到大M法&#xff0c;它之所以叫大M法&#xff0c;是因为它涉及到一个&#xff08;绝对值&#xff09;较大的系数M&#xff0c;这个大M的值应大于约束中的连续变量或者约束表达式可能取到的任何合理值&#xff0c;M值取过大往往会造成优化问题…...

史密斯圆图的使用

史密斯圆图的使用 简介识别史密斯圆图等反射系数圆归一化阻抗圆导纳圆图史密斯圆图的使用单支匹配双支匹配简介 史密斯图Smith Chart是电气工程,无线电,射频工程,微波工程和通信等领域常用的一种图示工具,用于分析和设计传输线和阻抗匹配网络,它由美国工程师Phillip H.Sm…...

可重复读解决了哪些问题? 对 SQL 慢查询会考虑哪些优化 ?

文章目录 可重复读解决了哪些问题&#xff1f;对 SQL 慢查询会考虑哪些优化 &#xff1f; 可重复读解决了哪些问题&#xff1f; &#xff08;1&#xff09;可重复读的核心就是一致性读(consistent read);保证多次读取同一个数据时&#xff0c;其值都和事务开始时候的内容是一致…...

从0开始python学习-35.allure报告企业定制

目录 1. 搭建allure环境 2. 生成报告 3. logo定制 4. 企业级报告内容或层级定制 5. allure局域网查看 1. 搭建allure环境 1.1 JDK&#xff0c;使用PyCharm 找到pycharm安装目录找到java.exe记下jbr目录的完整路径&#xff0c;eg: C:\Program Files\JetBrains\PyCharm Com…...

Leetcode 3577. Count the Number of Computer Unlocking Permutations

Leetcode 3577. Count the Number of Computer Unlocking Permutations 1. 解题思路2. 代码实现 题目链接&#xff1a;3577. Count the Number of Computer Unlocking Permutations 1. 解题思路 这一题其实就是一个脑筋急转弯&#xff0c;要想要能够将所有的电脑解锁&#x…...

Linux简单的操作

ls ls 查看当前目录 ll 查看详细内容 ls -a 查看所有的内容 ls --help 查看方法文档 pwd pwd 查看当前路径 cd cd 转路径 cd .. 转上一级路径 cd 名 转换路径 …...

pikachu靶场通关笔记22-1 SQL注入05-1-insert注入(报错法)

目录 一、SQL注入 二、insert注入 三、报错型注入 四、updatexml函数 五、源码审计 六、insert渗透实战 1、渗透准备 2、获取数据库名database 3、获取表名table 4、获取列名column 5、获取字段 本系列为通过《pikachu靶场通关笔记》的SQL注入关卡(共10关&#xff0…...

Unity | AmplifyShaderEditor插件基础(第七集:平面波动shader)

目录 一、&#x1f44b;&#x1f3fb;前言 二、&#x1f608;sinx波动的基本原理 三、&#x1f608;波动起来 1.sinx节点介绍 2.vertexPosition 3.集成Vector3 a.节点Append b.连起来 4.波动起来 a.波动的原理 b.时间节点 c.sinx的处理 四、&#x1f30a;波动优化…...

A2A JS SDK 完整教程:快速入门指南

目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库&#xff…...

莫兰迪高级灰总结计划简约商务通用PPT模版

莫兰迪高级灰总结计划简约商务通用PPT模版&#xff0c;莫兰迪调色板清新简约工作汇报PPT模版&#xff0c;莫兰迪时尚风极简设计PPT模版&#xff0c;大学生毕业论文答辩PPT模版&#xff0c;莫兰迪配色总结计划简约商务通用PPT模版&#xff0c;莫兰迪商务汇报PPT模版&#xff0c;…...

宇树科技,改名了!

提到国内具身智能和机器人领域的代表企业&#xff0c;那宇树科技&#xff08;Unitree&#xff09;必须名列其榜。 最近&#xff0c;宇树科技的一项新变动消息在业界引发了不少关注和讨论&#xff0c;即&#xff1a; 宇树向其合作伙伴发布了一封公司名称变更函称&#xff0c;因…...

CVPR2025重磅突破:AnomalyAny框架实现单样本生成逼真异常数据,破解视觉检测瓶颈!

本文介绍了一种名为AnomalyAny的创新框架&#xff0c;该方法利用Stable Diffusion的强大生成能力&#xff0c;仅需单个正常样本和文本描述&#xff0c;即可生成逼真且多样化的异常样本&#xff0c;有效解决了视觉异常检测中异常样本稀缺的难题&#xff0c;为工业质检、医疗影像…...

云原生周刊:k0s 成为 CNCF 沙箱项目

开源项目推荐 HAMi HAMi&#xff08;原名 k8s‑vGPU‑scheduler&#xff09;是一款 CNCF Sandbox 级别的开源 K8s 中间件&#xff0c;通过虚拟化 GPU/NPU 等异构设备并支持内存、计算核心时间片隔离及共享调度&#xff0c;为容器提供统一接口&#xff0c;实现细粒度资源配额…...

机器学习的数学基础:线性模型

线性模型 线性模型的基本形式为&#xff1a; f ( x ) ω T x b f\left(\boldsymbol{x}\right)\boldsymbol{\omega}^\text{T}\boldsymbol{x}b f(x)ωTxb 回归问题 利用最小二乘法&#xff0c;得到 ω \boldsymbol{\omega} ω和 b b b的参数估计$ \boldsymbol{\hat{\omega}}…...