当前位置: 首页 > news >正文

golang kafka sarama 源码解析

  • 消费者组重平衡

github.com/!shopify/sarama@v1.27.2/consumer_group.go

func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {// 获取broker组协调器coordinator, err := c.client.Coordinator(c.groupID)if err != nil {if retries <= 0 {return nil, err}return c.retryNewSession(ctx, topics, handler, retries, true)}// 申请加入组// Join consumer groupjoin, err := c.joinGroupRequest(coordinator, topics)if err != nil {_ = coordinator.Close()return nil, err}switch join.Err {case ErrNoError:c.memberID = join.MemberIdcase ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediatelyc.memberID = ""return c.newSession(ctx, topics, handler, retries)case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refreshif retries <= 0 {return nil, join.Err}return c.retryNewSession(ctx, topics, handler, retries, true)// 已经在重平衡期间case ErrRebalanceInProgress: // retry after backoffif retries <= 0 {return nil, join.Err}return c.retryNewSession(ctx, topics, handler, retries, false)default:return nil, join.Err}// 消费者组中的一个消费者作为leader,进行分区方案分配// Prepare distribution plan if we joined as the leadervar plan BalanceStrategyPlanif join.LeaderId == join.MemberId {members, err := join.GetMembers()if err != nil {return nil, err}// 分配分区plan, err = c.balance(members)if err != nil {return nil, err}}// 同步给kafka,只有 leader会带上分区方案// Sync consumer groupgroupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)if err != nil {_ = coordinator.Close()return nil, err}switch groupRequest.Err {case ErrNoError:case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediatelyc.memberID = ""return c.newSession(ctx, topics, handler, retries)case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refreshif retries <= 0 {return nil, groupRequest.Err}return c.retryNewSession(ctx, topics, handler, retries, true)case ErrRebalanceInProgress: // retry after backoffif retries <= 0 {return nil, groupRequest.Err}return c.retryNewSession(ctx, topics, handler, retries, false)default:return nil, groupRequest.Err}// Retrieve and sort claimsvar claims map[string][]int32 // topic->partions// 如果有可消费的分区if len(groupRequest.MemberAssignment) > 0 {members, err := groupRequest.GetMemberAssignment()if err != nil {return nil, err}claims = members.Topicsc.userData = members.UserDatafor _, partitions := range claims {sort.Sort(int32Slice(partitions))}}return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)
}
  • 消费者拉取消息
func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {bc := &brokerConsumer{consumer:         c,broker:           broker,input:            make(chan *partitionConsumer),newSubscriptions: make(chan []*partitionConsumer),wait:             make(chan none),subscriptions:    make(map[*partitionConsumer]none),refs:             0,}go withRecover(bc.subscriptionManager)go withRecover(bc.subscriptionConsumer)return bc
}

相关文章:

golang kafka sarama 源码解析

消费者组重平衡 github.com/!shopify/saramav1.27.2/consumer_group.go func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {// 获取broker组协调器coordinator, err :…...

Flutter知识点整理

JVM 1.Flutter Hot reload 实现原理 一、原理概述 Hot Reload 只能在 Debug 模式下使用&#xff0c;因为 Debug 模式下&#xff0c;Flutter 采用的是 JIT&#xff08; 动态编译&#xff09;&#xff0c;代码是运行在 Dart VM 上&#xff0c;JIT 将 Dart 编译成可以运行在 Dart…...

现代游戏引擎架构

一、并行编程 1.1 为什么需要并行编程 游戏的渲染计算对算力要求很高&#xff0c;所以我们需要把操作系统的资源利用到极致。 但是摩尔定律已经不在适用了&#xff0c;硬件的发展目前已经达到瓶颈。所以我们需要通过数量来提高计算效率。 1.2 并行编程基础 进程与线程&#…...

深度学习:复杂工业场景下的复杂缺陷检测方法

摘要&#xff1a;在复杂的工业场景中&#xff0c;缺陷检测一直是一个重要而具有挑战性的任务。近年来&#xff0c;深度学习技术的快速发展为复杂工业场景下的缺陷检测提供了新的解决方案。本文将介绍深度学习在复杂工业场景下的复杂缺陷检测中的应用&#xff0c;并探讨其技术进…...

CSDN个人简介优化 html font属性

CSDN个人简介优化 html font属性 个人简介个人简介优化字体21种样式选择字体大小设置4号字体 字体颜色设计渐变色&#xff08;可惜不能显示&#xff09; 字体加粗设置 <b>标签 个人简介 &#x1f308;你好呀&#xff01;我是 是Yu欸 &#x1f30c; 2024每日百字篆刻时光…...

从哈希桶角度看 unordered_map 与 unordered_set 的实现

文章目录 一、引言二、C unordered系列的无序关联式容器概览三、基于哈希桶的C unordered系列数据结构模拟实现1、unordered_map的模拟实现2、unordered_set的模拟实现3、哈希桶及其迭代器实现的代码 四、扩展与应用1. 自定义哈希函数2. 其他unordered数据结构unordered_multim…...

飞天使-k8s知识点27-kubernetes温故知新2-deployment

文章目录 RC和RS无状态应用管理 deployment有状态应用statefulSetdaemonSet RC和RS RC不会使用在生产环境 RS 比RC 多了标签选择器 &#xff0c;RS 用deployment管理&#xff0c;用于容器编排无状态应用管理 deployment apiVersion: apps/v1 kind: Deployment metadata:name:…...

手机网页关键词视频爬虫采集软件可导出视频分享链接|视频无水印批量下载工具

全新音视频批量下载工具&#xff0c;为您解放视频管理烦恼&#xff01; 现如今&#xff0c;音上涌现出大量精彩的视频内容&#xff0c;但是要想高效地获取、管理和分享这些视频却是一件颇具挑战的事情。针对这一难题&#xff0c;我们自主研发了全新的音视频批量下载工具&#x…...

基于OpenCV的图像处理案例之图像矫正(Python)

Index 目录索引 写在前面解决思路参考 写在前面 本文通过一个案例介绍如何使用OpenCV将倾斜的扫描文档图像进行水平矫正。 解决思路 因为扫描图像中的大部分文字倾斜后&#xff0c;同一行文字也在同一条直线&#xff0c;所以可以通过拟合直线来计算文本倾斜角度&#xff0c;…...

创建linux虚拟机系统:(安装Ubuntu镜像文件,包含语言设置、中文输入法、时间设置)

我下载的是清华大写开源软件镜像站中的ubuntu-20.04.6-desktop-amd64.iso这个镜像文件&#xff0c; 这个文件我下载完成之后没有解压&#xff0c;直接在创建虚拟机的时候选择的压缩包。 地址为&#xff1a;Index of /ubuntu-releases/20.04/ | 清华大学开源软件镜像站 | Tsin…...

3.0 V-22V 宽输入电压,高效率异步升压芯片-ZCC5429

一、产品综述 ZCC5429 芯片是一款自动调频、最高 600KHz工作频率、高效率、宽输入电压范围的电流模式异 步升压&#xff08;BOOST&#xff09;芯片&#xff0c;且可调输入限流功能。用户可灵活地通过外部补偿建立动态环路&#xff0c;获得在所有条件下最优瞬态性能。 ZCC542…...

Sphinx + Readthedocs 避坑速通指南

博主在学习使用 Sphinx 和 Read the docs 的过程中&#xff0c; 碰到了许多奇葩的 bug, 使得很简单的任务花费了很长的时间才解决&#xff0c;现在在这里做一个分享&#xff0c;帮助大家用更少的时间高效上线文档的内容。 总的来说&#xff0c; 任务分为两个部分&#xff1a; …...

IPP-7010 表面贴装 90 度混合耦合器

IPP-7010 表面贴装 90 度混合耦合器 IPP-7010 是一款表面贴装 90 度混合耦合器&#xff0c;工作频率为 800 至 2500 MHz&#xff08;0.8 至 2.5 GHz&#xff09;&#xff0c;平均额定功率为 200 瓦。IPP-7010 采用 0.40 x 1.80 英寸表面贴装封装。IPP-7010的幅度平衡小于0.6dB&…...

25.2 微服务Dubbo

25.2 微服务Dubbo 1. Dubbo简介2. RPC3. Dubbo工作原理4. 代码实操4.1 父项目1. 依赖4.2 服务提供者1. 依赖2. 配置文件3. 启动类4. 业务类4.3 服务消费者1. 依赖2. 配置文件3. 消费者启动类4. 业务:实现远程调用...

CI/CD环境搭建

服务简介 Gitlab 官网&#xff1a;https://about.gitlab.com/ GitLab 是一个用于仓库管理系统的开源项目&#xff0c;使用Git作为代码管理工具&#xff0c;并在此基础上搭建起来的Web服务。安装方法是参考GitLab在GitHub上的Wiki页面。Gitlab是被广泛使用的基于git的开源代码管…...

API调试管理工具Postman下载及操作介绍

1.下载安装postman地址&#xff1a;https://www.getpostman.com/downloads/ 2.创建项目 3.创建请求API 然后点击save保存api 4.用一个变量保存主域名&#xff0c;方便后续操作 就类似下面的baseurl 5.创建新环境 6.添加变量&#xff08;如添加本地测试环境url——ba…...

vue集成百度地图,实现关键字搜索并自定义覆盖物,保存成静态图片

vue集成百度地图&#xff0c;实现关键字搜索并自定义覆盖物 index.html引入百度地图js <script type"text/javascript" src"https://api.map.baidu.com/api?v2.0&typewebgl&akxxxxxxwMprS7jIfPt354VdgP"></script>vue页面代码 <…...

Java中的Stream流

一、介绍 1. Stream流的作用 结合了Lambda表达式&#xff0c;简化集合、数组的操作。 2. Stream流的使用步骤 ①先得到一条Stream流&#xff0c;并把数据放上去&#xff1b; 获取方式方法名说明单列集合default Stream<E> stream()Collection中的默认方法双列集合无无…...

前端UI怎么防止用户反复提交?

方法1&#xff1a;禁用按钮 用户点击“xxx”按钮后&#xff0c;先禁用按钮&#xff0c;防止用户多次点击&#xff1b;待请求完成后&#xff0c;再解禁按钮。 方法2&#xff1a;防抖&#xff08;Debouncing&#xff09; 防抖是一种技术&#xff0c;它可以延迟执行函数&#xff0…...

OpenHarmony游戏应用程序-实现的一个手柄游戏

介绍 本篇Codelab是基于TS扩展的声明式开发范式编程语言&#xff0c;以及OpenHarmony的分布式能力实现的一个手柄游戏。 说明&#xff1a; 本示例涉及使用系统接口&#xff0c;需要手动替换Full SDK才能编译通过。 完成本篇Codelab需要两台开发板&#xff0c;一台开发板作为游…...

基于距离变化能量开销动态调整的WSN低功耗拓扑控制开销算法matlab仿真

目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.核心程序 4.算法仿真参数 5.算法理论概述 6.参考文献 7.完整程序 1.程序功能描述 通过动态调整节点通信的能量开销&#xff0c;平衡网络负载&#xff0c;延长WSN生命周期。具体通过建立基于距离的能量消耗模型&am…...

MongoDB学习和应用(高效的非关系型数据库)

一丶 MongoDB简介 对于社交类软件的功能&#xff0c;我们需要对它的功能特点进行分析&#xff1a; 数据量会随着用户数增大而增大读多写少价值较低非好友看不到其动态信息地理位置的查询… 针对以上特点进行分析各大存储工具&#xff1a; mysql&#xff1a;关系型数据库&am…...

23-Oracle 23 ai 区块链表(Blockchain Table)

小伙伴有没有在金融强合规的领域中遇见&#xff0c;必须要保持数据不可变&#xff0c;管理员都无法修改和留痕的要求。比如医疗的电子病历中&#xff0c;影像检查检验结果不可篡改行的&#xff0c;药品追溯过程中数据只可插入无法删除的特性需求&#xff1b;登录日志、修改日志…...

【JVM】- 内存结构

引言 JVM&#xff1a;Java Virtual Machine 定义&#xff1a;Java虚拟机&#xff0c;Java二进制字节码的运行环境好处&#xff1a; 一次编写&#xff0c;到处运行自动内存管理&#xff0c;垃圾回收的功能数组下标越界检查&#xff08;会抛异常&#xff0c;不会覆盖到其他代码…...

从深圳崛起的“机器之眼”:赴港乐动机器人的万亿赛道赶考路

进入2025年以来&#xff0c;尽管围绕人形机器人、具身智能等机器人赛道的质疑声不断&#xff0c;但全球市场热度依然高涨&#xff0c;入局者持续增加。 以国内市场为例&#xff0c;天眼查专业版数据显示&#xff0c;截至5月底&#xff0c;我国现存在业、存续状态的机器人相关企…...

el-switch文字内置

el-switch文字内置 效果 vue <div style"color:#ffffff;font-size:14px;float:left;margin-bottom:5px;margin-right:5px;">自动加载</div> <el-switch v-model"value" active-color"#3E99FB" inactive-color"#DCDFE6"…...

第25节 Node.js 断言测试

Node.js的assert模块主要用于编写程序的单元测试时使用&#xff0c;通过断言可以提早发现和排查出错误。 稳定性: 5 - 锁定 这个模块可用于应用的单元测试&#xff0c;通过 require(assert) 可以使用这个模块。 assert.fail(actual, expected, message, operator) 使用参数…...

OpenLayers 分屏对比(地图联动)

注&#xff1a;当前使用的是 ol 5.3.0 版本&#xff0c;天地图使用的key请到天地图官网申请&#xff0c;并替换为自己的key 地图分屏对比在WebGIS开发中是很常见的功能&#xff0c;和卷帘图层不一样的是&#xff0c;分屏对比是在各个地图中添加相同或者不同的图层进行对比查看。…...

大语言模型(LLM)中的KV缓存压缩与动态稀疏注意力机制设计

随着大语言模型&#xff08;LLM&#xff09;参数规模的增长&#xff0c;推理阶段的内存占用和计算复杂度成为核心挑战。传统注意力机制的计算复杂度随序列长度呈二次方增长&#xff0c;而KV缓存的内存消耗可能高达数十GB&#xff08;例如Llama2-7B处理100K token时需50GB内存&a…...

GitHub 趋势日报 (2025年06月06日)

&#x1f4ca; 由 TrendForge 系统生成 | &#x1f310; https://trendforge.devlive.org/ &#x1f310; 本日报中的项目描述已自动翻译为中文 &#x1f4c8; 今日获星趋势图 今日获星趋势图 590 cognee 551 onlook 399 project-based-learning 348 build-your-own-x 320 ne…...