当前位置: 首页 > news >正文

Go语言使用 kafka-go 消费 Kafka 消息教程

Go语言使用 kafka-go 消费 Kafka 消息教程

在这篇教程中,我们将介绍如何使用 kafka-go 库来消费 Kafka 消息,并重点讲解 FetchMessageReadMessage 的区别,以及它们各自适用的场景。通过这篇教程,你将了解如何有效地使用 kafka-go 库来处理消息和管理偏移量。

安装 kafka-go

首先,你需要在项目中安装 kafka-go 库。可以使用以下命令:

go get github.com/segmentio/kafka-go
初始化 Kafka Reader

为了从 Kafka 消费消息,我们首先需要配置和初始化 Kafka Reader。以下是一个简单的 Kafka Reader 初始化示例:

package mainimport ("context""log""github.com/segmentio/kafka-go"
)func main() {// 创建 Kafka ReaderkafkaReader := kafka.NewReader(kafka.ReaderConfig{Brokers:   []string{"localhost:9092"}, // Kafka broker 地址Topic:     "example-topic",            // 订阅的 Kafka topicGroupID:   "example-group",            // 消费者组 IDPartition: 0,                          // 分区号 (可选)MinBytes:  10e3,                       // 10KBMaxBytes:  10e6,                       // 10MB})defer kafkaReader.Close()
}
使用 FetchMessage 消费消息

FetchMessage 允许你从 Kafka 消费消息并手动提交偏移量,这给你对消息处理的更精确控制。以下是如何使用 FetchMessage 的示例:

func consumeWithFetchMessage() {ctx := context.Background()for {// 从 Kafka 中获取下一条消息m, err := kafkaReader.FetchMessage(ctx)if err != nil {log.Printf("获取消息时出错: %v", err)break}// 打印消息内容log.Printf("消息: %s, 偏移量: %d", string(m.Value), m.Offset)// 处理消息 (在这里可以进行你的业务逻辑)// 手动提交偏移量if err := kafkaReader.CommitMessages(ctx, m); err != nil {log.Printf("提交偏移量时出错: %v", err)}}
}
优点
  • 精确控制偏移量:在处理消息后,你可以手动选择是否提交偏移量,这样可以确保只有在消息处理成功后才提交。
  • 重试机制:可以灵活地处理失败消息,例如在处理失败时,不提交偏移量,从而实现消息的重新消费。
缺点
  • 代码复杂度增加:需要手动处理偏移量提交,会增加一些额外的代码量。
使用 ReadMessage 消费消息

ReadMessage 是一种更简单的方式,从 Kafka 中获取消息并自动提交偏移量。适用于对消费逻辑不太敏感的场景。以下是使用 ReadMessage 的示例:

func consumeWithReadMessage() {ctx := context.Background()for {// 从 Kafka 中读取下一条消息并自动提交偏移量dataInfo, err := kafkaReader.ReadMessage(ctx)if err != nil {log.Printf("读取消息时出错: %v", err)break}// 打印消息内容log.Printf("消息: %s, 偏移量: %d", string(dataInfo.Value), dataInfo.Offset)// 处理消息 (在这里可以进行你的业务逻辑)}
}
优点
  • 简单易用ReadMessage 自动提交偏移量,代码简洁,易于维护。
  • 快速开发:适合简单的消息处理逻辑和对消息可靠性要求不高的场景。
缺点
  • 缺乏灵活性:无法在处理失败时重新消费消息,因为偏移量已经自动提交。
总结选择
方法优点缺点适用场景
FetchMessage需要手动提交偏移量,精确控制消息处理和提交逻辑代码复杂度较高需要精确控制消息处理的场景,例如处理失败重试
ReadMessage简单易用,自动提交偏移量,代码更简洁无法重新消费已处理失败的消息简单的消息处理,对消息处理成功率要求不高的场景
完整示例

以下是一个完整的 Kafka 消费者示例,包括 FetchMessageReadMessage 两种方法。可以根据你的需求选择合适的方法:

package mainimport ("context""log""github.com/segmentio/kafka-go"
)func main() {// 创建 Kafka ReaderkafkaReader := kafka.NewReader(kafka.ReaderConfig{Brokers:   []string{"localhost:9092"},Topic:     "example-topic",GroupID:   "example-group",MinBytes:  10e3, // 10KBMaxBytes:  10e6, // 10MB})defer kafkaReader.Close()// 使用 FetchMessage 消费消息log.Println("开始使用 FetchMessage 消费 Kafka 消息...")consumeWithFetchMessage(kafkaReader)// 使用 ReadMessage 消费消息log.Println("开始使用 ReadMessage 消费 Kafka 消息...")consumeWithReadMessage(kafkaReader)
}func consumeWithFetchMessage(kafkaReader *kafka.Reader) {ctx := context.Background()for {m, err := kafkaReader.FetchMessage(ctx)if err != nil {log.Printf("FetchMessage 获取消息时出错: %v", err)break}log.Printf("FetchMessage 消息: %s, 偏移量: %d", string(m.Value), m.Offset)// 手动提交偏移量if err := kafkaReader.CommitMessages(ctx, m); err != nil {log.Printf("FetchMessage 提交偏移量时出错: %v", err)}}
}func consumeWithReadMessage(kafkaReader *kafka.Reader) {ctx := context.Background()for {dataInfo, err := kafkaReader.ReadMessage(ctx)if err != nil {log.Printf("ReadMessage 读取消息时出错: %v", err)break}log.Printf("ReadMessage 消息: %s, 偏移量: %d", string(dataInfo.Value), dataInfo.Offset)}
}
结语

通过本教程,你学会了如何使用 kafka-goFetchMessageReadMessage 方法消费 Kafka 消息。根据项目需求选择合适的消费方式,合理管理偏移量以确保消息处理的可靠性和效率。

相关文章:

Go语言使用 kafka-go 消费 Kafka 消息教程

Go语言使用 kafka-go 消费 Kafka 消息教程 在这篇教程中,我们将介绍如何使用 kafka-go 库来消费 Kafka 消息,并重点讲解 FetchMessage 和 ReadMessage 的区别,以及它们各自适用的场景。通过这篇教程,你将了解如何有效地使用 kafk…...

【论文笔记】Number it: Temporal Grounding Videos like Flipping Manga

🍎个人主页:小嗷犬的个人主页 🍊个人网站:小嗷犬的技术小站 🥭个人信条:为天地立心,为生民立命,为往圣继绝学,为万世开太平。 基本信息 标题: Number it: Temporal Grou…...

C语言菜鸟入门·关键字·int的用法

目录 1. int关键字 1.1 取值范围 1.2 符号类型 1.3 运算 1.3.1 加法运算() 1.3.2 减法运算(-) 1.3.3 乘法运算(*) 1.3.4 除法运算(/) 1.3.5 取余运算(%) 1.3.6 自增()与自减(--) 1.3.7 位运算 2. 更多关键字 1. int关键字 int 是一个关键字&#xff0…...

基于企业微信客户端设计一个文件下载与预览系统

在企业内部沟通与协作中,文件分享和管理是不可或缺的一部分。企业微信(WeCom)作为一款广泛应用于企业的沟通工具,提供了丰富的API接口和功能,帮助企业进行高效的团队协作。然而,随着文件交换和协作的日益增…...

昇思MindSpore第七课---文本解码原理

1. 文本解码原理 文本解码是将模型的输出(通常是概率分布或词汇索引)转换为可读的自然语言文本的过程。在生成文本时,常见的解码方法包括贪心解码、束搜索(BeamSearch)、随机采样等。 2 实践 2.1 配置环境 安装mindn…...

C# 数据结构之【图】C#图

1. 图的概念 图是一种重要的数据结构,用于表示节点(顶点)之间的关系。图由一组顶点和连接这些顶点的边组成。图可以是有向的(边有方向)或无向的(边没有方向),可以是加权的&#xff…...

传输控制协议(TCP)和用户数据报协议(UDP)

一、传输控制协议(TCP) 传输控制协议(Transmission Control Protocol,TCP)是一种面向连接的、可靠的、基于字节流的传输层通信协议,由 IETF 的 RFC 793 定义。 它通过三次握手建立连接,确保数…...

【Python爬虫】Scrapy框架实战---百度首页热榜新闻

如何利用Scrapy框架实战提取百度首页热榜新闻的排名、标题和链接 一、安装Scrapy库 二、创建项目(以BaiduSpider为例) scrapy startproject BaiduSpider生成每个文件的功能: 二、 创建爬虫脚本(爬虫名:news&#xff…...

采用python3.12 +django5.1 结合 RabbitMQ 和发送邮件功能,实现一个简单的告警系统 前后端分离 vue-element

一、开发环境搭建和配置 #mac环境 brew install python3.12 python3.12 --version python3.12 -m pip install --upgrade pip python3.12 -m pip install Django5.1 python3.12 -m django --version #用于检索系统信息和进程管理 python3.12 -m pip install psutil #集成 pika…...

Qt 实现网络数据报文大小端数据的收发

1.大小端数据简介 大小端(Endianness)是计算机体系结构的一个术语,它描述了多字节数据在内存中的存储顺序。以下是大小端的定义和它们的特点: 大端(Big-Endian) 在大端模式中,一个字的最高有效…...

[译]Elasticsearch Sequence ID实现思路及用途

原文地址:https://www.elastic.co/blog/elasticsearch-sequence-ids-6-0 如果 几年前,在Elastic,我们问自己一个"如果"问题,我们知道这将带来有趣的见解: "如果我们在Elasticsearch中对索引操作进行全面排序会怎样…...

Java基于SpringBoot+Vue的藏区特产销售平台

博主介绍:✌程序员徐师兄、7年大厂程序员经历。全网粉丝12w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 🍅文末获取源码联系🍅 👇🏻 精彩专栏推荐订阅👇…...

12-表的约束

知识背景 表的约束,就是在表中的数据上加上约束,也被称为数据完整性约束。数据完整性约束的目的是为了不被规定的、不符合规范的数据进入数据库 在录入数据库或数据发生变化时,DBMS(数据库管理系统)会按照一定的约束条件对数据进行监测&…...

【人工智能】深度学习入门:用TensorFlow实现多层感知器(MLP)模型

《Python OpenCV从菜鸟到高手》带你进入图像处理与计算机视觉的大门! 多层感知器(MLP)是一种基础的神经网络结构,广泛应用于分类和回归任务。作为深度学习的重要组成部分,理解并实现MLP是学习更复杂神经网络模型的基础。本文将介绍多层感知器的核心概念、数学原理,并使用…...

【Go】-go中的锁机制

目录 一、锁的基础知识 1. 互斥量/互斥锁 2. CAS(compare and swap) 3. 自旋锁 4. 读写锁 5. 乐观锁 & 悲观锁 6. 死锁 二、go中锁机制 1. Mutex-互斥锁 2. RWMutex-读写锁 2.1 RWMutex流程概览 2.2 写锁饥饿问题 2.3. golang的读写锁源…...

c ++零基础可视化——vector

c 零基础可视化——vector 初始化 vector<int> v0(5); // 0 0 0 0 0 vector<int> v1(5, 1); // 1 1 1 1 1 vector<int> v2{1, 2, 3} // 1 2 3 vector<int> v3(v1); // 1 1 1 1 1 vector<vector<int>> v4(2, vect…...

Centos 7 安装 Docker 最新版本

文章目录 一、卸载旧版本二、安装最新版本docker三、问题解决3.1 启动docker报错3.2 启动容器报错 一、卸载旧版本 #如果之前安装过旧版本的Docker&#xff0c;可以使用下面命令卸载 yum remove docker \docker-client \docker-client-latest \docker-common \docker-latest …...

构建高效在线教育:SpringBoot课程管理系统

1系统概述 1.1 研究背景 随着计算机技术的发展以及计算机网络的逐渐普及&#xff0c;互联网成为人们查找信息的重要场所&#xff0c;二十一世纪是信息的时代&#xff0c;所以信息的管理显得特别重要。因此&#xff0c;使用计算机来管理在线课程管理系统的相关信息成为必然。开发…...

二进制与网络安全的关系

二进制与网络安全的关系 声明&#xff01; 学习视频来自B站up主 泷羽sec 有兴趣的师傅可以关注一下&#xff0c;如涉及侵权马上删除文章&#xff0c;笔记只是方便各位师傅的学习和探讨&#xff0c;文章所提到的网站以及内容&#xff0c;只做学习交流&#xff0c;其他均与本人以…...

【计算机网络】网段划分

一、为什么有网段划分 IP地址 网络号(目标网络) 主机号(目标主机) 网络号: 保证相互连接的两个网段具有不同的标识 主机号: 同一网段内&#xff0c;主机之间具有相同的网络号&#xff0c;但是必须有不同的主机号 互联网中的每一台主机&#xff0c;都要隶属于某一个子网 -&…...

Leetcode 3576. Transform Array to All Equal Elements

Leetcode 3576. Transform Array to All Equal Elements 1. 解题思路2. 代码实现 题目链接&#xff1a;3576. Transform Array to All Equal Elements 1. 解题思路 这一题思路上就是分别考察一下是否能将其转化为全1或者全-1数组即可。 至于每一种情况是否可以达到&#xf…...

React Native 开发环境搭建(全平台详解)

React Native 开发环境搭建&#xff08;全平台详解&#xff09; 在开始使用 React Native 开发移动应用之前&#xff0c;正确设置开发环境是至关重要的一步。本文将为你提供一份全面的指南&#xff0c;涵盖 macOS 和 Windows 平台的配置步骤&#xff0c;如何在 Android 和 iOS…...

家政维修平台实战20:权限设计

目录 1 获取工人信息2 搭建工人入口3 权限判断总结 目前我们已经搭建好了基础的用户体系&#xff0c;主要是分成几个表&#xff0c;用户表我们是记录用户的基础信息&#xff0c;包括手机、昵称、头像。而工人和员工各有各的表。那么就有一个问题&#xff0c;不同的角色&#xf…...

Cinnamon修改面板小工具图标

Cinnamon开始菜单-CSDN博客 设置模块都是做好的&#xff0c;比GNOME简单得多&#xff01; 在 applet.js 里增加 const Settings imports.ui.settings;this.settings new Settings.AppletSettings(this, HTYMenusonichy, instance_id); this.settings.bind(menu-icon, menu…...

【AI学习】三、AI算法中的向量

在人工智能&#xff08;AI&#xff09;算法中&#xff0c;向量&#xff08;Vector&#xff09;是一种将现实世界中的数据&#xff08;如图像、文本、音频等&#xff09;转化为计算机可处理的数值型特征表示的工具。它是连接人类认知&#xff08;如语义、视觉特征&#xff09;与…...

解决本地部署 SmolVLM2 大语言模型运行 flash-attn 报错

出现的问题 安装 flash-attn 会一直卡在 build 那一步或者运行报错 解决办法 是因为你安装的 flash-attn 版本没有对应上&#xff0c;所以报错&#xff0c;到 https://github.com/Dao-AILab/flash-attention/releases 下载对应版本&#xff0c;cu、torch、cp 的版本一定要对…...

【OSG学习笔记】Day 16: 骨骼动画与蒙皮(osgAnimation)

骨骼动画基础 骨骼动画是 3D 计算机图形中常用的技术&#xff0c;它通过以下两个主要组件实现角色动画。 骨骼系统 (Skeleton)&#xff1a;由层级结构的骨头组成&#xff0c;类似于人体骨骼蒙皮 (Mesh Skinning)&#xff1a;将模型网格顶点绑定到骨骼上&#xff0c;使骨骼移动…...

七、数据库的完整性

七、数据库的完整性 主要内容 7.1 数据库的完整性概述 7.2 实体完整性 7.3 参照完整性 7.4 用户定义的完整性 7.5 触发器 7.6 SQL Server中数据库完整性的实现 7.7 小结 7.1 数据库的完整性概述 数据库完整性的含义 正确性 指数据的合法性 有效性 指数据是否属于所定…...

安全突围:重塑内生安全体系:齐向东在2025年BCS大会的演讲

文章目录 前言第一部分&#xff1a;体系力量是突围之钥第一重困境是体系思想落地不畅。第二重困境是大小体系融合瓶颈。第三重困境是“小体系”运营梗阻。 第二部分&#xff1a;体系矛盾是突围之障一是数据孤岛的障碍。二是投入不足的障碍。三是新旧兼容难的障碍。 第三部分&am…...

Kubernetes 节点自动伸缩(Cluster Autoscaler)原理与实践

在 Kubernetes 集群中&#xff0c;如何在保障应用高可用的同时有效地管理资源&#xff0c;一直是运维人员和开发者关注的重点。随着微服务架构的普及&#xff0c;集群内各个服务的负载波动日趋明显&#xff0c;传统的手动扩缩容方式已无法满足实时性和弹性需求。 Cluster Auto…...