当前位置: 首页 > news >正文

golang kafka sarama 源码解析

  • 消费者组重平衡

github.com/!shopify/sarama@v1.27.2/consumer_group.go

func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {// 获取broker组协调器coordinator, err := c.client.Coordinator(c.groupID)if err != nil {if retries <= 0 {return nil, err}return c.retryNewSession(ctx, topics, handler, retries, true)}// 申请加入组// Join consumer groupjoin, err := c.joinGroupRequest(coordinator, topics)if err != nil {_ = coordinator.Close()return nil, err}switch join.Err {case ErrNoError:c.memberID = join.MemberIdcase ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediatelyc.memberID = ""return c.newSession(ctx, topics, handler, retries)case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refreshif retries <= 0 {return nil, join.Err}return c.retryNewSession(ctx, topics, handler, retries, true)// 已经在重平衡期间case ErrRebalanceInProgress: // retry after backoffif retries <= 0 {return nil, join.Err}return c.retryNewSession(ctx, topics, handler, retries, false)default:return nil, join.Err}// 消费者组中的一个消费者作为leader,进行分区方案分配// Prepare distribution plan if we joined as the leadervar plan BalanceStrategyPlanif join.LeaderId == join.MemberId {members, err := join.GetMembers()if err != nil {return nil, err}// 分配分区plan, err = c.balance(members)if err != nil {return nil, err}}// 同步给kafka,只有 leader会带上分区方案// Sync consumer groupgroupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)if err != nil {_ = coordinator.Close()return nil, err}switch groupRequest.Err {case ErrNoError:case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediatelyc.memberID = ""return c.newSession(ctx, topics, handler, retries)case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refreshif retries <= 0 {return nil, groupRequest.Err}return c.retryNewSession(ctx, topics, handler, retries, true)case ErrRebalanceInProgress: // retry after backoffif retries <= 0 {return nil, groupRequest.Err}return c.retryNewSession(ctx, topics, handler, retries, false)default:return nil, groupRequest.Err}// Retrieve and sort claimsvar claims map[string][]int32 // topic->partions// 如果有可消费的分区if len(groupRequest.MemberAssignment) > 0 {members, err := groupRequest.GetMemberAssignment()if err != nil {return nil, err}claims = members.Topicsc.userData = members.UserDatafor _, partitions := range claims {sort.Sort(int32Slice(partitions))}}return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)
}
  • 消费者拉取消息
func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {bc := &brokerConsumer{consumer:         c,broker:           broker,input:            make(chan *partitionConsumer),newSubscriptions: make(chan []*partitionConsumer),wait:             make(chan none),subscriptions:    make(map[*partitionConsumer]none),refs:             0,}go withRecover(bc.subscriptionManager)go withRecover(bc.subscriptionConsumer)return bc
}

相关文章:

golang kafka sarama 源码解析

消费者组重平衡 github.com/!shopify/saramav1.27.2/consumer_group.go func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {// 获取broker组协调器coordinator, err :…...

Flutter知识点整理

JVM 1.Flutter Hot reload 实现原理 一、原理概述 Hot Reload 只能在 Debug 模式下使用&#xff0c;因为 Debug 模式下&#xff0c;Flutter 采用的是 JIT&#xff08; 动态编译&#xff09;&#xff0c;代码是运行在 Dart VM 上&#xff0c;JIT 将 Dart 编译成可以运行在 Dart…...

现代游戏引擎架构

一、并行编程 1.1 为什么需要并行编程 游戏的渲染计算对算力要求很高&#xff0c;所以我们需要把操作系统的资源利用到极致。 但是摩尔定律已经不在适用了&#xff0c;硬件的发展目前已经达到瓶颈。所以我们需要通过数量来提高计算效率。 1.2 并行编程基础 进程与线程&#…...

深度学习:复杂工业场景下的复杂缺陷检测方法

摘要&#xff1a;在复杂的工业场景中&#xff0c;缺陷检测一直是一个重要而具有挑战性的任务。近年来&#xff0c;深度学习技术的快速发展为复杂工业场景下的缺陷检测提供了新的解决方案。本文将介绍深度学习在复杂工业场景下的复杂缺陷检测中的应用&#xff0c;并探讨其技术进…...

CSDN个人简介优化 html font属性

CSDN个人简介优化 html font属性 个人简介个人简介优化字体21种样式选择字体大小设置4号字体 字体颜色设计渐变色&#xff08;可惜不能显示&#xff09; 字体加粗设置 <b>标签 个人简介 &#x1f308;你好呀&#xff01;我是 是Yu欸 &#x1f30c; 2024每日百字篆刻时光…...

从哈希桶角度看 unordered_map 与 unordered_set 的实现

文章目录 一、引言二、C unordered系列的无序关联式容器概览三、基于哈希桶的C unordered系列数据结构模拟实现1、unordered_map的模拟实现2、unordered_set的模拟实现3、哈希桶及其迭代器实现的代码 四、扩展与应用1. 自定义哈希函数2. 其他unordered数据结构unordered_multim…...

飞天使-k8s知识点27-kubernetes温故知新2-deployment

文章目录 RC和RS无状态应用管理 deployment有状态应用statefulSetdaemonSet RC和RS RC不会使用在生产环境 RS 比RC 多了标签选择器 &#xff0c;RS 用deployment管理&#xff0c;用于容器编排无状态应用管理 deployment apiVersion: apps/v1 kind: Deployment metadata:name:…...

手机网页关键词视频爬虫采集软件可导出视频分享链接|视频无水印批量下载工具

全新音视频批量下载工具&#xff0c;为您解放视频管理烦恼&#xff01; 现如今&#xff0c;音上涌现出大量精彩的视频内容&#xff0c;但是要想高效地获取、管理和分享这些视频却是一件颇具挑战的事情。针对这一难题&#xff0c;我们自主研发了全新的音视频批量下载工具&#x…...

基于OpenCV的图像处理案例之图像矫正(Python)

Index 目录索引 写在前面解决思路参考 写在前面 本文通过一个案例介绍如何使用OpenCV将倾斜的扫描文档图像进行水平矫正。 解决思路 因为扫描图像中的大部分文字倾斜后&#xff0c;同一行文字也在同一条直线&#xff0c;所以可以通过拟合直线来计算文本倾斜角度&#xff0c;…...

创建linux虚拟机系统:(安装Ubuntu镜像文件,包含语言设置、中文输入法、时间设置)

我下载的是清华大写开源软件镜像站中的ubuntu-20.04.6-desktop-amd64.iso这个镜像文件&#xff0c; 这个文件我下载完成之后没有解压&#xff0c;直接在创建虚拟机的时候选择的压缩包。 地址为&#xff1a;Index of /ubuntu-releases/20.04/ | 清华大学开源软件镜像站 | Tsin…...

3.0 V-22V 宽输入电压,高效率异步升压芯片-ZCC5429

一、产品综述 ZCC5429 芯片是一款自动调频、最高 600KHz工作频率、高效率、宽输入电压范围的电流模式异 步升压&#xff08;BOOST&#xff09;芯片&#xff0c;且可调输入限流功能。用户可灵活地通过外部补偿建立动态环路&#xff0c;获得在所有条件下最优瞬态性能。 ZCC542…...

Sphinx + Readthedocs 避坑速通指南

博主在学习使用 Sphinx 和 Read the docs 的过程中&#xff0c; 碰到了许多奇葩的 bug, 使得很简单的任务花费了很长的时间才解决&#xff0c;现在在这里做一个分享&#xff0c;帮助大家用更少的时间高效上线文档的内容。 总的来说&#xff0c; 任务分为两个部分&#xff1a; …...

IPP-7010 表面贴装 90 度混合耦合器

IPP-7010 表面贴装 90 度混合耦合器 IPP-7010 是一款表面贴装 90 度混合耦合器&#xff0c;工作频率为 800 至 2500 MHz&#xff08;0.8 至 2.5 GHz&#xff09;&#xff0c;平均额定功率为 200 瓦。IPP-7010 采用 0.40 x 1.80 英寸表面贴装封装。IPP-7010的幅度平衡小于0.6dB&…...

25.2 微服务Dubbo

25.2 微服务Dubbo 1. Dubbo简介2. RPC3. Dubbo工作原理4. 代码实操4.1 父项目1. 依赖4.2 服务提供者1. 依赖2. 配置文件3. 启动类4. 业务类4.3 服务消费者1. 依赖2. 配置文件3. 消费者启动类4. 业务:实现远程调用...

CI/CD环境搭建

服务简介 Gitlab 官网&#xff1a;https://about.gitlab.com/ GitLab 是一个用于仓库管理系统的开源项目&#xff0c;使用Git作为代码管理工具&#xff0c;并在此基础上搭建起来的Web服务。安装方法是参考GitLab在GitHub上的Wiki页面。Gitlab是被广泛使用的基于git的开源代码管…...

API调试管理工具Postman下载及操作介绍

1.下载安装postman地址&#xff1a;https://www.getpostman.com/downloads/ 2.创建项目 3.创建请求API 然后点击save保存api 4.用一个变量保存主域名&#xff0c;方便后续操作 就类似下面的baseurl 5.创建新环境 6.添加变量&#xff08;如添加本地测试环境url——ba…...

vue集成百度地图,实现关键字搜索并自定义覆盖物,保存成静态图片

vue集成百度地图&#xff0c;实现关键字搜索并自定义覆盖物 index.html引入百度地图js <script type"text/javascript" src"https://api.map.baidu.com/api?v2.0&typewebgl&akxxxxxxwMprS7jIfPt354VdgP"></script>vue页面代码 <…...

Java中的Stream流

一、介绍 1. Stream流的作用 结合了Lambda表达式&#xff0c;简化集合、数组的操作。 2. Stream流的使用步骤 ①先得到一条Stream流&#xff0c;并把数据放上去&#xff1b; 获取方式方法名说明单列集合default Stream<E> stream()Collection中的默认方法双列集合无无…...

前端UI怎么防止用户反复提交?

方法1&#xff1a;禁用按钮 用户点击“xxx”按钮后&#xff0c;先禁用按钮&#xff0c;防止用户多次点击&#xff1b;待请求完成后&#xff0c;再解禁按钮。 方法2&#xff1a;防抖&#xff08;Debouncing&#xff09; 防抖是一种技术&#xff0c;它可以延迟执行函数&#xff0…...

OpenHarmony游戏应用程序-实现的一个手柄游戏

介绍 本篇Codelab是基于TS扩展的声明式开发范式编程语言&#xff0c;以及OpenHarmony的分布式能力实现的一个手柄游戏。 说明&#xff1a; 本示例涉及使用系统接口&#xff0c;需要手动替换Full SDK才能编译通过。 完成本篇Codelab需要两台开发板&#xff0c;一台开发板作为游…...

硬盘分区数据彻底清除工具|支持多算法覆写擦除,确保文件销毁不可恢复

温馨提示&#xff1a;文末有联系方式工具核心功能说明 本工具专为硬盘分区级数据安全销毁设计&#xff0c;可对用户指定的整个磁盘分区执行底层覆盖式擦除&#xff0c;确保所有原始数据&#xff08;包括已删除文件残留、系统临时文件、隐藏扇区数据等&#xff09;被完全覆盖并失…...

Elsevier论文审稿状态追踪工具:让科研进度管理变得轻松

Elsevier论文审稿状态追踪工具&#xff1a;让科研进度管理变得轻松 【免费下载链接】Elsevier-Tracker 项目地址: https://gitcode.com/gh_mirrors/el/Elsevier-Tracker 还在为论文投稿后的漫长等待而焦虑吗&#xff1f;Elsevier论文审稿状态追踪工具是一款专为科研工作…...

MyBatis-Plus lambdaQuery条件构造器:EQ、NE、GT等操作符实战解析

1. 为什么需要lambdaQuery条件构造器 如果你用过MyBatis&#xff0c;肯定遇到过这样的场景&#xff1a;为了查询某个状态的数据&#xff0c;不得不写一堆if判断来拼接SQL。比如查询用户列表&#xff0c;要根据不同条件筛选&#xff0c;代码里全是"if(name!null){sql"…...

如何在TI-28388 DSP的CM核上快速搭建freeRTOS环境(附LED控制实战)

在TI-28388 DSP的CM核上构建freeRTOS开发环境的完整指南 1. 环境准备与硬件配置 在开始freeRTOS移植之前&#xff0c;我们需要确保开发环境配置正确。TI-28388 DSP是一款多核处理器&#xff0c;包含两个C28x核和一个ARM Cortex-M4核&#xff08;CM核&#xff09;。我们将专注于…...

终极指南:Mitsuba 3与Dr.Jit编译器如何用JIT技术重塑渲染管线

终极指南&#xff1a;Mitsuba 3与Dr.Jit编译器如何用JIT技术重塑渲染管线 【免费下载链接】mitsuba3 Mitsuba 3: A Retargetable Forward and Inverse Renderer 项目地址: https://gitcode.com/gh_mirrors/mi/mitsuba3 Mitsuba 3是一个革命性的研究导向渲染系统&#xf…...

SumatraPDF终极书签管理指南:从基础导航到高级技巧

SumatraPDF终极书签管理指南&#xff1a;从基础导航到高级技巧 【免费下载链接】sumatrapdf SumatraPDF reader 项目地址: https://gitcode.com/gh_mirrors/su/sumatrapdf SumatraPDF作为一款轻量级的多格式文档阅读器&#xff0c;在PDF书签管理方面提供了独特而实用的解…...

Honey Select 2游戏体验终极优化指南:HS2-HF_Patch完整解决方案

Honey Select 2游戏体验终极优化指南&#xff1a;HS2-HF_Patch完整解决方案 【免费下载链接】HS2-HF_Patch Automatically translate, uncensor and update HoneySelect2! 项目地址: https://gitcode.com/gh_mirrors/hs/HS2-HF_Patch 当你打开Honey Select 2时&#xff…...

SpringBoot3实战:JetCache多级缓存架构设计与性能优化

1. 为什么需要多级缓存架构 在电商、社交、内容平台等高并发场景中&#xff0c;数据库往往成为性能瓶颈。我去年参与的一个社区项目&#xff0c;在高峰期每秒要处理近万次用户动态查询&#xff0c;单纯依赖MySQL的QPS只能撑到2000左右。这时候缓存就成了救命稻草&#xff0c;但…...

CMake构建类型全解析:Debug、Release、RelWithDebInfo、MinSizeRel到底怎么选?

CMake构建类型全解析&#xff1a;Debug、Release、RelWithDebInfo、MinSizeRel到底怎么选&#xff1f; 在软件开发的世界里&#xff0c;构建类型的选择往往决定了最终产品的表现形态。就像摄影师会根据不同场景选择光圈大小一样&#xff0c;开发者也需要根据项目阶段和需求选择…...

R语言建模总“跑不通”?3步定位环境污染源:从.Rprofile到Sys.getenv()的深度诊断手册

第一章&#xff1a;R语言建模环境“跑不通”现象的典型表现与危害R语言建模环境中的“跑不通”并非指语法错误导致的立即报错&#xff0c;而是一类隐蔽性强、复现性差、定位困难的系统性失配问题。这类问题常在跨平台迁移、版本升级或协作开发中集中爆发&#xff0c;表面看似代…...