当前位置: 首页 > news >正文

golang kafka sarama 源码解析

  • 消费者组重平衡

github.com/!shopify/sarama@v1.27.2/consumer_group.go

func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {// 获取broker组协调器coordinator, err := c.client.Coordinator(c.groupID)if err != nil {if retries <= 0 {return nil, err}return c.retryNewSession(ctx, topics, handler, retries, true)}// 申请加入组// Join consumer groupjoin, err := c.joinGroupRequest(coordinator, topics)if err != nil {_ = coordinator.Close()return nil, err}switch join.Err {case ErrNoError:c.memberID = join.MemberIdcase ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediatelyc.memberID = ""return c.newSession(ctx, topics, handler, retries)case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refreshif retries <= 0 {return nil, join.Err}return c.retryNewSession(ctx, topics, handler, retries, true)// 已经在重平衡期间case ErrRebalanceInProgress: // retry after backoffif retries <= 0 {return nil, join.Err}return c.retryNewSession(ctx, topics, handler, retries, false)default:return nil, join.Err}// 消费者组中的一个消费者作为leader,进行分区方案分配// Prepare distribution plan if we joined as the leadervar plan BalanceStrategyPlanif join.LeaderId == join.MemberId {members, err := join.GetMembers()if err != nil {return nil, err}// 分配分区plan, err = c.balance(members)if err != nil {return nil, err}}// 同步给kafka,只有 leader会带上分区方案// Sync consumer groupgroupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)if err != nil {_ = coordinator.Close()return nil, err}switch groupRequest.Err {case ErrNoError:case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediatelyc.memberID = ""return c.newSession(ctx, topics, handler, retries)case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refreshif retries <= 0 {return nil, groupRequest.Err}return c.retryNewSession(ctx, topics, handler, retries, true)case ErrRebalanceInProgress: // retry after backoffif retries <= 0 {return nil, groupRequest.Err}return c.retryNewSession(ctx, topics, handler, retries, false)default:return nil, groupRequest.Err}// Retrieve and sort claimsvar claims map[string][]int32 // topic->partions// 如果有可消费的分区if len(groupRequest.MemberAssignment) > 0 {members, err := groupRequest.GetMemberAssignment()if err != nil {return nil, err}claims = members.Topicsc.userData = members.UserDatafor _, partitions := range claims {sort.Sort(int32Slice(partitions))}}return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)
}
  • 消费者拉取消息
func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {bc := &brokerConsumer{consumer:         c,broker:           broker,input:            make(chan *partitionConsumer),newSubscriptions: make(chan []*partitionConsumer),wait:             make(chan none),subscriptions:    make(map[*partitionConsumer]none),refs:             0,}go withRecover(bc.subscriptionManager)go withRecover(bc.subscriptionConsumer)return bc
}

相关文章:

golang kafka sarama 源码解析

消费者组重平衡 github.com/!shopify/saramav1.27.2/consumer_group.go func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {// 获取broker组协调器coordinator, err :…...

Flutter知识点整理

JVM 1.Flutter Hot reload 实现原理 一、原理概述 Hot Reload 只能在 Debug 模式下使用&#xff0c;因为 Debug 模式下&#xff0c;Flutter 采用的是 JIT&#xff08; 动态编译&#xff09;&#xff0c;代码是运行在 Dart VM 上&#xff0c;JIT 将 Dart 编译成可以运行在 Dart…...

现代游戏引擎架构

一、并行编程 1.1 为什么需要并行编程 游戏的渲染计算对算力要求很高&#xff0c;所以我们需要把操作系统的资源利用到极致。 但是摩尔定律已经不在适用了&#xff0c;硬件的发展目前已经达到瓶颈。所以我们需要通过数量来提高计算效率。 1.2 并行编程基础 进程与线程&#…...

深度学习:复杂工业场景下的复杂缺陷检测方法

摘要&#xff1a;在复杂的工业场景中&#xff0c;缺陷检测一直是一个重要而具有挑战性的任务。近年来&#xff0c;深度学习技术的快速发展为复杂工业场景下的缺陷检测提供了新的解决方案。本文将介绍深度学习在复杂工业场景下的复杂缺陷检测中的应用&#xff0c;并探讨其技术进…...

CSDN个人简介优化 html font属性

CSDN个人简介优化 html font属性 个人简介个人简介优化字体21种样式选择字体大小设置4号字体 字体颜色设计渐变色&#xff08;可惜不能显示&#xff09; 字体加粗设置 <b>标签 个人简介 &#x1f308;你好呀&#xff01;我是 是Yu欸 &#x1f30c; 2024每日百字篆刻时光…...

从哈希桶角度看 unordered_map 与 unordered_set 的实现

文章目录 一、引言二、C unordered系列的无序关联式容器概览三、基于哈希桶的C unordered系列数据结构模拟实现1、unordered_map的模拟实现2、unordered_set的模拟实现3、哈希桶及其迭代器实现的代码 四、扩展与应用1. 自定义哈希函数2. 其他unordered数据结构unordered_multim…...

飞天使-k8s知识点27-kubernetes温故知新2-deployment

文章目录 RC和RS无状态应用管理 deployment有状态应用statefulSetdaemonSet RC和RS RC不会使用在生产环境 RS 比RC 多了标签选择器 &#xff0c;RS 用deployment管理&#xff0c;用于容器编排无状态应用管理 deployment apiVersion: apps/v1 kind: Deployment metadata:name:…...

手机网页关键词视频爬虫采集软件可导出视频分享链接|视频无水印批量下载工具

全新音视频批量下载工具&#xff0c;为您解放视频管理烦恼&#xff01; 现如今&#xff0c;音上涌现出大量精彩的视频内容&#xff0c;但是要想高效地获取、管理和分享这些视频却是一件颇具挑战的事情。针对这一难题&#xff0c;我们自主研发了全新的音视频批量下载工具&#x…...

基于OpenCV的图像处理案例之图像矫正(Python)

Index 目录索引 写在前面解决思路参考 写在前面 本文通过一个案例介绍如何使用OpenCV将倾斜的扫描文档图像进行水平矫正。 解决思路 因为扫描图像中的大部分文字倾斜后&#xff0c;同一行文字也在同一条直线&#xff0c;所以可以通过拟合直线来计算文本倾斜角度&#xff0c;…...

创建linux虚拟机系统:(安装Ubuntu镜像文件,包含语言设置、中文输入法、时间设置)

我下载的是清华大写开源软件镜像站中的ubuntu-20.04.6-desktop-amd64.iso这个镜像文件&#xff0c; 这个文件我下载完成之后没有解压&#xff0c;直接在创建虚拟机的时候选择的压缩包。 地址为&#xff1a;Index of /ubuntu-releases/20.04/ | 清华大学开源软件镜像站 | Tsin…...

3.0 V-22V 宽输入电压,高效率异步升压芯片-ZCC5429

一、产品综述 ZCC5429 芯片是一款自动调频、最高 600KHz工作频率、高效率、宽输入电压范围的电流模式异 步升压&#xff08;BOOST&#xff09;芯片&#xff0c;且可调输入限流功能。用户可灵活地通过外部补偿建立动态环路&#xff0c;获得在所有条件下最优瞬态性能。 ZCC542…...

Sphinx + Readthedocs 避坑速通指南

博主在学习使用 Sphinx 和 Read the docs 的过程中&#xff0c; 碰到了许多奇葩的 bug, 使得很简单的任务花费了很长的时间才解决&#xff0c;现在在这里做一个分享&#xff0c;帮助大家用更少的时间高效上线文档的内容。 总的来说&#xff0c; 任务分为两个部分&#xff1a; …...

IPP-7010 表面贴装 90 度混合耦合器

IPP-7010 表面贴装 90 度混合耦合器 IPP-7010 是一款表面贴装 90 度混合耦合器&#xff0c;工作频率为 800 至 2500 MHz&#xff08;0.8 至 2.5 GHz&#xff09;&#xff0c;平均额定功率为 200 瓦。IPP-7010 采用 0.40 x 1.80 英寸表面贴装封装。IPP-7010的幅度平衡小于0.6dB&…...

25.2 微服务Dubbo

25.2 微服务Dubbo 1. Dubbo简介2. RPC3. Dubbo工作原理4. 代码实操4.1 父项目1. 依赖4.2 服务提供者1. 依赖2. 配置文件3. 启动类4. 业务类4.3 服务消费者1. 依赖2. 配置文件3. 消费者启动类4. 业务:实现远程调用...

CI/CD环境搭建

服务简介 Gitlab 官网&#xff1a;https://about.gitlab.com/ GitLab 是一个用于仓库管理系统的开源项目&#xff0c;使用Git作为代码管理工具&#xff0c;并在此基础上搭建起来的Web服务。安装方法是参考GitLab在GitHub上的Wiki页面。Gitlab是被广泛使用的基于git的开源代码管…...

API调试管理工具Postman下载及操作介绍

1.下载安装postman地址&#xff1a;https://www.getpostman.com/downloads/ 2.创建项目 3.创建请求API 然后点击save保存api 4.用一个变量保存主域名&#xff0c;方便后续操作 就类似下面的baseurl 5.创建新环境 6.添加变量&#xff08;如添加本地测试环境url——ba…...

vue集成百度地图,实现关键字搜索并自定义覆盖物,保存成静态图片

vue集成百度地图&#xff0c;实现关键字搜索并自定义覆盖物 index.html引入百度地图js <script type"text/javascript" src"https://api.map.baidu.com/api?v2.0&typewebgl&akxxxxxxwMprS7jIfPt354VdgP"></script>vue页面代码 <…...

Java中的Stream流

一、介绍 1. Stream流的作用 结合了Lambda表达式&#xff0c;简化集合、数组的操作。 2. Stream流的使用步骤 ①先得到一条Stream流&#xff0c;并把数据放上去&#xff1b; 获取方式方法名说明单列集合default Stream<E> stream()Collection中的默认方法双列集合无无…...

前端UI怎么防止用户反复提交?

方法1&#xff1a;禁用按钮 用户点击“xxx”按钮后&#xff0c;先禁用按钮&#xff0c;防止用户多次点击&#xff1b;待请求完成后&#xff0c;再解禁按钮。 方法2&#xff1a;防抖&#xff08;Debouncing&#xff09; 防抖是一种技术&#xff0c;它可以延迟执行函数&#xff0…...

OpenHarmony游戏应用程序-实现的一个手柄游戏

介绍 本篇Codelab是基于TS扩展的声明式开发范式编程语言&#xff0c;以及OpenHarmony的分布式能力实现的一个手柄游戏。 说明&#xff1a; 本示例涉及使用系统接口&#xff0c;需要手动替换Full SDK才能编译通过。 完成本篇Codelab需要两台开发板&#xff0c;一台开发板作为游…...

java_网络服务相关_gateway_nacos_feign区别联系

1. spring-cloud-starter-gateway 作用&#xff1a;作为微服务架构的网关&#xff0c;统一入口&#xff0c;处理所有外部请求。 核心能力&#xff1a; 路由转发&#xff08;基于路径、服务名等&#xff09;过滤器&#xff08;鉴权、限流、日志、Header 处理&#xff09;支持负…...

8k长序列建模,蛋白质语言模型Prot42仅利用目标蛋白序列即可生成高亲和力结合剂

蛋白质结合剂&#xff08;如抗体、抑制肽&#xff09;在疾病诊断、成像分析及靶向药物递送等关键场景中发挥着不可替代的作用。传统上&#xff0c;高特异性蛋白质结合剂的开发高度依赖噬菌体展示、定向进化等实验技术&#xff0c;但这类方法普遍面临资源消耗巨大、研发周期冗长…...

听写流程自动化实践,轻量级教育辅助

随着智能教育工具的发展&#xff0c;越来越多的传统学习方式正在被数字化、自动化所优化。听写作为语文、英语等学科中重要的基础训练形式&#xff0c;也迎来了更高效的解决方案。 这是一款轻量但功能强大的听写辅助工具。它是基于本地词库与可选在线语音引擎构建&#xff0c;…...

IP如何挑?2025年海外专线IP如何购买?

你花了时间和预算买了IP&#xff0c;结果IP质量不佳&#xff0c;项目效率低下不说&#xff0c;还可能带来莫名的网络问题&#xff0c;是不是太闹心了&#xff1f;尤其是在面对海外专线IP时&#xff0c;到底怎么才能买到适合自己的呢&#xff1f;所以&#xff0c;挑IP绝对是个技…...

NPOI操作EXCEL文件 ——CAD C# 二次开发

缺点:dll.版本容易加载错误。CAD加载插件时&#xff0c;没有加载所有类库。插件运行过程中用到某个类库&#xff0c;会从CAD的安装目录找&#xff0c;找不到就报错了。 【方案2】让CAD在加载过程中把类库加载到内存 【方案3】是发现缺少了哪个库&#xff0c;就用插件程序加载进…...

android13 app的触摸问题定位分析流程

一、知识点 一般来说,触摸问题都是app层面出问题,我们可以在ViewRootImpl.java添加log的方式定位;如果是touchableRegion的计算问题,就会相对比较麻烦了,需要通过adb shell dumpsys input > input.log指令,且通过打印堆栈的方式,逐步定位问题,并找到修改方案。 问题…...

Chromium 136 编译指南 Windows篇:depot_tools 配置与源码获取(二)

引言 工欲善其事&#xff0c;必先利其器。在完成了 Visual Studio 2022 和 Windows SDK 的安装后&#xff0c;我们即将接触到 Chromium 开发生态中最核心的工具——depot_tools。这个由 Google 精心打造的工具集&#xff0c;就像是连接开发者与 Chromium 庞大代码库的智能桥梁…...

c++第七天 继承与派生2

这一篇文章主要内容是 派生类构造函数与析构函数 在派生类中重写基类成员 以及多继承 第一部分&#xff1a;派生类构造函数与析构函数 当创建一个派生类对象时&#xff0c;基类成员是如何初始化的&#xff1f; 1.当派生类对象创建的时候&#xff0c;基类成员的初始化顺序 …...

从“安全密码”到测试体系:Gitee Test 赋能关键领域软件质量保障

关键领域软件测试的"安全密码"&#xff1a;Gitee Test如何破解行业痛点 在数字化浪潮席卷全球的今天&#xff0c;软件系统已成为国家关键领域的"神经中枢"。从国防军工到能源电力&#xff0c;从金融交易到交通管控&#xff0c;这些关乎国计民生的关键领域…...

Unity VR/MR开发-VR开发与传统3D开发的差异

视频讲解链接&#xff1a;【XR马斯维】VR/MR开发与传统3D开发的差异【UnityVR/MR开发教程--入门】_哔哩哔哩_bilibili...