golang kafka sarama 源码解析
- 消费者组重平衡
github.com/!shopify/sarama@v1.27.2/consumer_group.go
func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {// 获取broker组协调器coordinator, err := c.client.Coordinator(c.groupID)if err != nil {if retries <= 0 {return nil, err}return c.retryNewSession(ctx, topics, handler, retries, true)}// 申请加入组// Join consumer groupjoin, err := c.joinGroupRequest(coordinator, topics)if err != nil {_ = coordinator.Close()return nil, err}switch join.Err {case ErrNoError:c.memberID = join.MemberIdcase ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediatelyc.memberID = ""return c.newSession(ctx, topics, handler, retries)case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refreshif retries <= 0 {return nil, join.Err}return c.retryNewSession(ctx, topics, handler, retries, true)// 已经在重平衡期间case ErrRebalanceInProgress: // retry after backoffif retries <= 0 {return nil, join.Err}return c.retryNewSession(ctx, topics, handler, retries, false)default:return nil, join.Err}// 消费者组中的一个消费者作为leader,进行分区方案分配// Prepare distribution plan if we joined as the leadervar plan BalanceStrategyPlanif join.LeaderId == join.MemberId {members, err := join.GetMembers()if err != nil {return nil, err}// 分配分区plan, err = c.balance(members)if err != nil {return nil, err}}// 同步给kafka,只有 leader会带上分区方案// Sync consumer groupgroupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)if err != nil {_ = coordinator.Close()return nil, err}switch groupRequest.Err {case ErrNoError:case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediatelyc.memberID = ""return c.newSession(ctx, topics, handler, retries)case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refreshif retries <= 0 {return nil, groupRequest.Err}return c.retryNewSession(ctx, topics, handler, retries, true)case ErrRebalanceInProgress: // retry after backoffif retries <= 0 {return nil, groupRequest.Err}return c.retryNewSession(ctx, topics, handler, retries, false)default:return nil, groupRequest.Err}// Retrieve and sort claimsvar claims map[string][]int32 // topic->partions// 如果有可消费的分区if len(groupRequest.MemberAssignment) > 0 {members, err := groupRequest.GetMemberAssignment()if err != nil {return nil, err}claims = members.Topicsc.userData = members.UserDatafor _, partitions := range claims {sort.Sort(int32Slice(partitions))}}return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)
}
- 消费者拉取消息
func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {bc := &brokerConsumer{consumer: c,broker: broker,input: make(chan *partitionConsumer),newSubscriptions: make(chan []*partitionConsumer),wait: make(chan none),subscriptions: make(map[*partitionConsumer]none),refs: 0,}go withRecover(bc.subscriptionManager)go withRecover(bc.subscriptionConsumer)return bc
}相关文章:
golang kafka sarama 源码解析
消费者组重平衡 github.com/!shopify/saramav1.27.2/consumer_group.go func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {// 获取broker组协调器coordinator, err :…...
Flutter知识点整理
JVM 1.Flutter Hot reload 实现原理 一、原理概述 Hot Reload 只能在 Debug 模式下使用,因为 Debug 模式下,Flutter 采用的是 JIT( 动态编译),代码是运行在 Dart VM 上,JIT 将 Dart 编译成可以运行在 Dart…...
现代游戏引擎架构
一、并行编程 1.1 为什么需要并行编程 游戏的渲染计算对算力要求很高,所以我们需要把操作系统的资源利用到极致。 但是摩尔定律已经不在适用了,硬件的发展目前已经达到瓶颈。所以我们需要通过数量来提高计算效率。 1.2 并行编程基础 进程与线程&#…...
深度学习:复杂工业场景下的复杂缺陷检测方法
摘要:在复杂的工业场景中,缺陷检测一直是一个重要而具有挑战性的任务。近年来,深度学习技术的快速发展为复杂工业场景下的缺陷检测提供了新的解决方案。本文将介绍深度学习在复杂工业场景下的复杂缺陷检测中的应用,并探讨其技术进…...
CSDN个人简介优化 html font属性
CSDN个人简介优化 html font属性 个人简介个人简介优化字体21种样式选择字体大小设置4号字体 字体颜色设计渐变色(可惜不能显示) 字体加粗设置 <b>标签 个人简介 🌈你好呀!我是 是Yu欸 🌌 2024每日百字篆刻时光…...
从哈希桶角度看 unordered_map 与 unordered_set 的实现
文章目录 一、引言二、C unordered系列的无序关联式容器概览三、基于哈希桶的C unordered系列数据结构模拟实现1、unordered_map的模拟实现2、unordered_set的模拟实现3、哈希桶及其迭代器实现的代码 四、扩展与应用1. 自定义哈希函数2. 其他unordered数据结构unordered_multim…...
飞天使-k8s知识点27-kubernetes温故知新2-deployment
文章目录 RC和RS无状态应用管理 deployment有状态应用statefulSetdaemonSet RC和RS RC不会使用在生产环境 RS 比RC 多了标签选择器 ,RS 用deployment管理,用于容器编排无状态应用管理 deployment apiVersion: apps/v1 kind: Deployment metadata:name:…...
手机网页关键词视频爬虫采集软件可导出视频分享链接|视频无水印批量下载工具
全新音视频批量下载工具,为您解放视频管理烦恼! 现如今,音上涌现出大量精彩的视频内容,但是要想高效地获取、管理和分享这些视频却是一件颇具挑战的事情。针对这一难题,我们自主研发了全新的音视频批量下载工具&#x…...
基于OpenCV的图像处理案例之图像矫正(Python)
Index 目录索引 写在前面解决思路参考 写在前面 本文通过一个案例介绍如何使用OpenCV将倾斜的扫描文档图像进行水平矫正。 解决思路 因为扫描图像中的大部分文字倾斜后,同一行文字也在同一条直线,所以可以通过拟合直线来计算文本倾斜角度,…...
创建linux虚拟机系统:(安装Ubuntu镜像文件,包含语言设置、中文输入法、时间设置)
我下载的是清华大写开源软件镜像站中的ubuntu-20.04.6-desktop-amd64.iso这个镜像文件, 这个文件我下载完成之后没有解压,直接在创建虚拟机的时候选择的压缩包。 地址为:Index of /ubuntu-releases/20.04/ | 清华大学开源软件镜像站 | Tsin…...
3.0 V-22V 宽输入电压,高效率异步升压芯片-ZCC5429
一、产品综述 ZCC5429 芯片是一款自动调频、最高 600KHz工作频率、高效率、宽输入电压范围的电流模式异 步升压(BOOST)芯片,且可调输入限流功能。用户可灵活地通过外部补偿建立动态环路,获得在所有条件下最优瞬态性能。 ZCC542…...
Sphinx + Readthedocs 避坑速通指南
博主在学习使用 Sphinx 和 Read the docs 的过程中, 碰到了许多奇葩的 bug, 使得很简单的任务花费了很长的时间才解决,现在在这里做一个分享,帮助大家用更少的时间高效上线文档的内容。 总的来说, 任务分为两个部分: …...
IPP-7010 表面贴装 90 度混合耦合器
IPP-7010 表面贴装 90 度混合耦合器 IPP-7010 是一款表面贴装 90 度混合耦合器,工作频率为 800 至 2500 MHz(0.8 至 2.5 GHz),平均额定功率为 200 瓦。IPP-7010 采用 0.40 x 1.80 英寸表面贴装封装。IPP-7010的幅度平衡小于0.6dB&…...
25.2 微服务Dubbo
25.2 微服务Dubbo 1. Dubbo简介2. RPC3. Dubbo工作原理4. 代码实操4.1 父项目1. 依赖4.2 服务提供者1. 依赖2. 配置文件3. 启动类4. 业务类4.3 服务消费者1. 依赖2. 配置文件3. 消费者启动类4. 业务:实现远程调用...
CI/CD环境搭建
服务简介 Gitlab 官网:https://about.gitlab.com/ GitLab 是一个用于仓库管理系统的开源项目,使用Git作为代码管理工具,并在此基础上搭建起来的Web服务。安装方法是参考GitLab在GitHub上的Wiki页面。Gitlab是被广泛使用的基于git的开源代码管…...
API调试管理工具Postman下载及操作介绍
1.下载安装postman地址:https://www.getpostman.com/downloads/ 2.创建项目 3.创建请求API 然后点击save保存api 4.用一个变量保存主域名,方便后续操作 就类似下面的baseurl 5.创建新环境 6.添加变量(如添加本地测试环境url——ba…...
vue集成百度地图,实现关键字搜索并自定义覆盖物,保存成静态图片
vue集成百度地图,实现关键字搜索并自定义覆盖物 index.html引入百度地图js <script type"text/javascript" src"https://api.map.baidu.com/api?v2.0&typewebgl&akxxxxxxwMprS7jIfPt354VdgP"></script>vue页面代码 <…...
Java中的Stream流
一、介绍 1. Stream流的作用 结合了Lambda表达式,简化集合、数组的操作。 2. Stream流的使用步骤 ①先得到一条Stream流,并把数据放上去; 获取方式方法名说明单列集合default Stream<E> stream()Collection中的默认方法双列集合无无…...
前端UI怎么防止用户反复提交?
方法1:禁用按钮 用户点击“xxx”按钮后,先禁用按钮,防止用户多次点击;待请求完成后,再解禁按钮。 方法2:防抖(Debouncing) 防抖是一种技术,它可以延迟执行函数࿰…...
OpenHarmony游戏应用程序-实现的一个手柄游戏
介绍 本篇Codelab是基于TS扩展的声明式开发范式编程语言,以及OpenHarmony的分布式能力实现的一个手柄游戏。 说明: 本示例涉及使用系统接口,需要手动替换Full SDK才能编译通过。 完成本篇Codelab需要两台开发板,一台开发板作为游…...
Vim 调用外部命令学习笔记
Vim 外部命令集成完全指南 文章目录 Vim 外部命令集成完全指南核心概念理解命令语法解析语法对比 常用外部命令详解文本排序与去重文本筛选与搜索高级 grep 搜索技巧文本替换与编辑字符处理高级文本处理编程语言处理其他实用命令 范围操作示例指定行范围处理复合命令示例 实用技…...
Cursor实现用excel数据填充word模版的方法
cursor主页:https://www.cursor.com/ 任务目标:把excel格式的数据里的单元格,按照某一个固定模版填充到word中 文章目录 注意事项逐步生成程序1. 确定格式2. 调试程序 注意事项 直接给一个excel文件和最终呈现的word文件的示例,…...
SciencePlots——绘制论文中的图片
文章目录 安装一、风格二、1 资源 安装 # 安装最新版 pip install githttps://github.com/garrettj403/SciencePlots.git# 安装稳定版 pip install SciencePlots一、风格 简单好用的深度学习论文绘图专用工具包–Science Plot 二、 1 资源 论文绘图神器来了:一行…...
前端导出带有合并单元格的列表
// 导出async function exportExcel(fileName "共识调整.xlsx") {// 所有数据const exportData await getAllMainData();// 表头内容let fitstTitleList [];const secondTitleList [];allColumns.value.forEach(column > {if (!column.children) {fitstTitleL…...
cf2117E
原题链接:https://codeforces.com/contest/2117/problem/E 题目背景: 给定两个数组a,b,可以执行多次以下操作:选择 i (1 < i < n - 1),并设置 或,也可以在执行上述操作前执行一次删除任意 和 。求…...
鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个生活电费的缴纳和查询小程序
一、项目初始化与配置 1. 创建项目 ohpm init harmony/utility-payment-app 2. 配置权限 // module.json5 {"requestPermissions": [{"name": "ohos.permission.INTERNET"},{"name": "ohos.permission.GET_NETWORK_INFO"…...
什么?连接服务器也能可视化显示界面?:基于X11 Forwarding + CentOS + MobaXterm实战指南
文章目录 什么是X11?环境准备实战步骤1️⃣ 服务器端配置(CentOS)2️⃣ 客户端配置(MobaXterm)3️⃣ 验证X11 Forwarding4️⃣ 运行自定义GUI程序(Python示例)5️⃣ 成功效果 👦 小明最近上线了校园奶茶配送功能,业务火爆,大家都在加料: 有的同学要加波霸 🟤,有的要加椰果…...
