142. Go操作Kafka(confluent-kafka-go库)
文章目录
- Apache kafka简介
- 开始使用Apache Kafka
- 构建生产者
- 构建消费者
- 总结
之前已经有两篇文章介绍过
Go如何操作
kafka
28.windows安装kafka,Go操作kafka示例(sarama库)
51.Go操作kafka示例(kafka-go库)
Apache kafka简介

Apache Kafka是一个开源分布式事件流平台,用于高性能数据管道、流式分析、数据集成和关键任务应用程序。它提供了诸如流数据管道、实时分析、数据集成、事件源和日志聚合等多种能力。
Kafka的用例和能力
-
流数据管道:
Kafka提供了一个分布式发布-订阅消息系统,可以在系统或应用程序之间流式传输数据。它提供了具有数据复制和容错能力的强大队列。 -
实时分析:
Kafka允许使用工具如Kafka Streams和KSQL处理实时数据流,用于构建流式分析和数据处理应用程序。 -
数据集成 :
Kafka可以用来通过在不同数据源和格式之间流式传输数据来集成不同的系统。这使它对流式ETL非常有用。 -
事件源 :
Kafka提供了可以重放的事件时间日志,用于重构应用程序状态,适用于事件源和CQRS模式。 -
日志聚合 :
Kafka通常用于将不同服务器和应用程序的日志聚合到一个中央存储库中。这允许统一访问日志数据。
为什么将Golang与Apache Kafka结合使用
将Golang这一高效并发的编程语言与Apache Kafka这一分布式事件流平台结合起来,提供了一个在构建尖端现代应用程序方面表现出色的强大技术栈。这两种技术之间的协同作用源自几个关键优势:
-
性能 :
Golang和Apache Kafka都提供高性能。Golang快速、高效和轻量级。Kafka为速度而构建,具有高吞吐量和低延迟。它们一起可以处理苛刻的工作负载。 -
可扩展性 :
Golang的goroutines和Kafka的分区允许应用程序水平扩展以处理大量数据。Kafka可以轻松扩展生产者和消费者。 -
并发性 :
Golang通过goroutines和channels提供了出色的并发编程能力。Kafka并发传递消息并支持并行性。 -
可用性 :
Kafka的分布式架构使其高度可用和容错。Golang应用可以利用这一点来构建弹性系统。 -
互操作性 :
Kafka有多种语言的客户端,允许Golang应用与多语言环境互动。Kafka还使用二进制TCP协议以提高效率。 -
现代设计 :
Kafka和Golang都采用现代设计理念,使它们非常适合云原生和微服务架构。 -
开发人员体验 :
Kafka的客户端库结合Goroutines、channels和接口,使其易于使用。
Kafka和Golang将性能、可扩展性和并发与生产力结合在一起 - 使它们成为构建可扩展的服务、管道和流应用程序的绝佳选择。
开始使用Apache Kafka
在开始使用Golang和Apache Kafka之前,我们必须确保golang和Kafka已经安装并在我们的机器上运行。
安装Kafka
28.windows安装kafka,Go操作kafka示例(sarama库)
Apache Kafka的Golang包
您可以使用go get安装confluent-kafka-go包:
go get -u github.com/confluentinc/confluent-kafka-go/kafka
安装后,您可以在Go代码中导入并使用confluent-kafka-go。
package mainimport ("fmt""github.com/confluentinc/confluent-kafka-go/kafka"
)func main() {p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})if err != nil {fmt.Printf("创建生产者失败: %s\n", err)return}// 生产消息到主题,处理交付报告等。// 使用后记得关闭生产者defer p.Close()
}
构建生产者
Kafka生产者是Apache Kafka生态系统中的一个关键组成部分,作为一个客户端应用程序,负责向Kafka集群发布(写入)事件。这一部分提供了关于Kafka生产者的全面概述,以及针对调整其行为的配置设置的初步探讨。
下面是一个Golang应用程序的示例,它生产数据并将其发布到Kafka的具体topic。它还说明了如何在Golang中为Kafka消息序列化数据,并演示了如何处理错误和重试。
package mainimport ("fmt""github.com/confluentinc/confluent-kafka-go/kafka"
)const (kafkaBroker = "localhost:9092"topic = "test-topic"
)type Messagestruct {Key string `json:"key"`Value string `json:"value"`
}func main() {// 创建一个新的Kafka生产者p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": kafkaBroker})if err != nil {fmt.Printf("创建生产者失败: %s\n", err)return}defer p.Close()// 定义要发送的消息message := Message{Key: "example_key",Value: "Hello, Kafka!",}// 序列化消息serializedMessage, err := serializeMessage(message)if err != nil {fmt.Printf("消息序列化失败: %s\n", err)return}// 将消息生产到Kafka主题err = produceMessage(p, topic, serializedMessage)if err != nil {fmt.Printf("消息生产失败: %s\n", err)return}fmt.Println("消息成功生产!")
}func serializeMessage(message Message) ([]byte, error) {// 将消息结构体序列化为JSONserialized, err := json.Marshal(message)if err != nil {return nil, fmt.Errorf("消息序列化失败: %w", err)}return serialized, nil
}func produceMessage(p *kafka.Producer, topic string, message []byte) error {// 创建一个新的要生产的Kafka消息kafkaMessage := &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},Value: message,}// 生产Kafka消息deliveryChan := make(chan kafka.Event)err := p.Produce(kafkaMessage, deliveryChan)if err != nil {return fmt.Errorf("消息生产失败: %w", err)}// 等待交付报告或错误e := <-deliveryChanm := e.(*kafka.Message)// 检查交付错误,即生成者方确保发送到Broker的消息不丢失// 但可能重复发送,如实际发成功了,但是Broker的ACK返回给生产者时出现了网络Error// 从而重试后,导致消息重复发送,这时候需要下游做好幂等处理if m.TopicPartition.Error != nil {return fmt.Errorf("交付失败: %s", m.TopicPartition.Error)}// 关闭交付频道close(deliveryChan)return nil
}
步骤解释:
-
创建一个
Kafka生产者。 -
使用
json.Marshal函数将自定义消息结构体(Message)序列化为JSON。 -
使用生产者将序列化的消息生产到
Kafka topic。 -
使用交付报告和错误检查处理错误和重试。
确保将localhost:9092替换为您的Kafka代理地址,将test-topic替换为所需的主题名称。此外,您可能需要处理更复杂的错误场景并根据您的具体需求实现重试逻辑。
构建消费者
Kafka消费者就像小型事件处理器,它们获取并消化数据流。它们订阅主题并消费任何新到达的消息,处理每一个消息。我们将探讨这些消费者的内部工作原理和调整其性能的配置。
下面是一个Golang应用程序的示例,它从Kafka主题消费消息。如下代码包括了如何处理消费到的消息的说明,以及对不同消费模式(如单个消费者和消费者组)的讨论。
package mainimport ("fmt""os""os/signal""github.com/confluentinc/confluent-kafka-go/kafka"
)const (kafkaBroker = "localhost:9092"topic = "test-topic"groupID = "test-group"
)func main() {// 创建一个新的Kafka消费者c, err := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers": kafkaBroker,"group.id": groupID, // 消费者组标识"auto.offset.reset": "earliest", // 从头开始消费})if err != nil {fmt.Printf("创建消费者失败: %s\n", err)return}defer c.Close()// 订阅Kafka主题err = c.SubscribeTopics([]string{topic}, nil)if err != nil {fmt.Printf("订阅主题失败: %s\n", err)return}// 设置一个通道来处理操作系统信号,以便优雅地关闭sigchan := make(chan os.Signal, 1)signal.Notify(sigchan, os.Interrupt)// 开始消费消息run := truefor run == true {select {case sig := <-sigchan:fmt.Printf("接收到信号 %v: 正在终止\n", sig)run = falsedefault:// 轮询Kafka消息,1次最多拉取100条消息ev := c.Poll(100) if ev == nil {continue}switch e := ev.(type) {case *kafka.Message:// 处理消费的消息fmt.Printf("从主题 %s 收到消息: %s\n", *e.TopicPartition.Topic, string(e.Value))case kafka.Error:// 处理Kafka错误fmt.Printf("错误: %v\n", e)}}}
}
步骤解释:
-
创建一个
Kafka消费者。 -
订阅一个
Kafka主题。 -
设置一个通道来处理操作系统信号(如
SIGINT)以优雅地关闭。 -
开始从订阅的
Topic消费消息。 -
处理消费的消息以及
Kafka错误。
不同的消费模式:
-
单个消费者:在这种模式下,单个消费者实例从主题的一个或多个分区读取消息。当您只需要一个消费者应用程序实例来处理来自
Topic的所有消息时,这很有用。 -
消费者组:消费者组允许您通过将消息处理分布到多个消费者实例来扩展消费,以实现扩展。每个消费者组可以有多个消费者,组内的每个消费者从一部分分区读取消息。这使得消息的并行处理成为可能,提供了容错能力和高吞吐量。
在提供的示例中,group.id配置设置用于指定消费者组ID。这允许消费者应用程序的多个实例在消费者组中一起工作,从Kafka Topic消费消息。
总结
总之,Apache Kafka作为构建实时数据管道和流应用程序的强大解决方案,得益于其分布式、可扩展和容错的架构。当与Golang结合时,它形成了一个在性能、可扩展性和并发方面表现出色的强大技术栈,非常适合现代应用程序。通过利用Kafka的功能和Golang的优势,开发人员可以构建出具有弹性和高性能的服务、管道和流应用程序,这些应用程序可以轻松扩展以满足当今数据驱动世界的需求。无论是处理实时分析、集成不同的系统还是聚合日志,Kafka和Golang提供了一个强势组合,使开发人员能够轻松构建创新和可扩展的解决方案。
相关文章:
142. Go操作Kafka(confluent-kafka-go库)
文章目录 Apache kafka简介开始使用Apache Kafka构建生产者构建消费者 总结 之前已经有两篇文章介绍过 Go如何操作 kafka 28.windows安装kafka,Go操作kafka示例(sarama库) 51.Go操作kafka示例(kafka-go库) Apache ka…...
spring boot(学习笔记第十九课)
spring boot(学习笔记第十九课) Spring boot的batch框架,以及Swagger3(OpenAPI)整合 学习内容: Spring boot的batch框架Spring boot的Swagger3(OpenAPI)整合 1. Spring boot batch框架 Spring Batch是什么 Spring Batch 是一个…...
docker安装 redis 并且加密开启SSL/TLS通道
拉取镜像 docker pull registry.cn-hangzhou.aliyuncs.com/qiluo-images/redis:latest docker tag registry.cn-hangzhou.aliyuncs.com/qiluo-images/redis:latest redis:latest要在 Docker 容器中启动 Redis 并开启 SSL/TLS 加密,需按照以下步骤修改启动命令和配置…...
什么是ARM架构?什么是X86架构?两者的区别是什么?
一、什么是ARM架构 (一)起源于发展 ARM 架构由英国剑桥的 Acorn 计算机公司开发。因市场无合适产品,Acorn 自行设计出第一款微处理器,命名为 ARM。此后 ARM 架构不断发展,1990 年为与苹果合作成立 ARM 公司࿰…...
【vscode】vscode paste image插件设置
本文首发于 ❄️慕雪的寒舍 vscode编辑md文件的时候,如果想插入图片,自带的粘贴只会粘贴到当前目录下,也没有文件重命名,很不友好。 在扩展商店里面有mushan的Paste Image插件,相比自带的,更加友好一点。但…...
自定义string类
#include <iostream> #include <string> int main() { std::string str "Hello, World!"; // 使用 c_str() 将 std::string 转换为 C 风格字符串,并传递给 printf printf("The string is: %s\n", str.c_str()); // 尝试修改…...
Python | Leetcode Python题解之第387题字符串中的第一个唯一字符
题目: 题解: class Solution:def firstUniqChar(self, s: str) -> int:position dict()q collections.deque()n len(s)for i, ch in enumerate(s):if ch not in position:position[ch] iq.append((s[i], i))else:position[ch] -1while q and po…...
RocketMQ 消费时序列化报错问题分析及解决
问题背景 在2024年3月7日,系统消费 RocketMQ 消息时出现了序列化报错,错误信息显示为: java.io.InvalidClassException: com.xxx.xxx.bean.mg.GoodsChangeLogMessage; local class incompatible: stream classdesc serialVersionUID... 这是…...
全能与专精:探索未来AI模型的发展趋势与市场潜力
文章目录 每日一句正能量前言AI模型的全面评估和比较AI模型的专精化和可扩展性AI模型的合理使用和道德规范后记 每日一句正能量 一个人,如果没有经受过投资失败的痛楚,又怎么会看到绝望之后的海阔天空。很多时候,经历了人生中最艰难的事&…...
Python深度学习:【开源数据集系列】ImageNet数据集
ImageNet 是一个大规模的视觉数据集,是计算机视觉领域最重要的基准数据集之一。该数据集由普林斯顿大学和斯坦福大学的研究人员发起,于 2009 年推出。ImageNet 是用于物体分类、目标检测、图像分割、姿势估计等多种任务的通用数据集,尤其在深度学习和计算机视觉的突破性研究…...
微信小程序手写签名
微信小程序手写签名组件 该组件基于signature_pad封装,signature_pad本身是web端的插件,此处将插件代码修改为小程序端可用。 signature_pad.js /*!* Signature Pad v5.0.3 | https://github.com/szimek/signature_pad* (c) 2024 Szymon Nowak | Releas…...
Javascript 使用中点查找矩形的角(Find Corners of Rectangle using mid points)
考虑一个矩形 ABCD,我们给出了边 AD 和 BC 中点(分别为 p 和 q)的坐标以及它们的长度 L(AD BC L)。现在给定参数,我们需要打印 4 个点 A、B、C 和 D 的坐标。 例子: 输入:p (1,…...
【困难】 猿人学web第一届 第18题 jsvmp 洞察先机
文章目录 数据接口分析还原加密参数插桩调试分析日志插桩补充 python 代码 数据接口分析 数据接口 https://match.yuanrenxue.cn/match/18data 请求参数 {page: 页码, t: 时间戳, v: 加密值} 请求第一页不需要携带 t, v 参数 cookie 只需要携带 sessionid 只要 还原加密字段…...
IDEA Maven 源修改为国内阿里云镜像的正确方式
💝💝💝欢迎莅临我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:「storm…...
OpenCV 旋转矩形边界
边界矩形是用最小面积绘制的,所以它也考虑了旋转。使用的函数是**cv.minAreaRect**()。 import cv2 import numpy as npimgcv2.imread(rD:\PythonProject\thunder.jpg) img1cv2.cvtColor(img,cv2.COLOR_BGR2GRAY) print(img.dtype) ret,threshcv2.threshold(img1,1…...
人车防撞系统安全生产方案
根据《市场监管总局关于2021~2023年全国特种设备安全状况的通告》数据显示:2023年:全国共发生特种设备事故和相关事故71起,其中死亡69人。包含叉车在内的场(厂)内专用机动车辆事故29起、死亡28人,占事故总数的40.85%、死亡人数的4…...
开放式耳机哪个牌子好?长文传授6招秘籍,彻底远离坑货!
大家好,作为一位专注于评测各类数码产品的博主,今天我特别推荐开放式耳机作为我们日常的首选。这种耳机以其独特的设计,避免了传统耳机长时间佩戴可能带来的不适和感染风险。开放式耳机佩戴简便且稳固,尤其适合热爱跑步和运动的…...
vue2和vue3双向绑定的原理
Vue.js 的双向绑定是 Vue 框架的核心特性之一,它允许数据和视图之间保持同步。虽然 Vue 2 和 Vue 3 都实现了双向绑定,但它们在实现细节上有所不同。 Vue 2 双向绑定的原理 在 Vue 2 中,双向绑定主要依赖于 Object.defineProperty 和观察者…...
别为大文件烦恼!mp4文件太大怎么变小?3个管用方法
你是否曾经遇到过mp4视频文件过大的困扰?每当想要分享或存储mp4文件时,巨大的文件就成了阻碍。明明感觉感觉没占用多少空间,但是设备却常常出现空间过满警告。 没多少空间的设备真是让人大为恼火,没人想多花一份钱买设备。那么只…...
cocotb的接收和发送逻辑,还是没有弄明白
发送有两种方式 1、定义这样的发 通过前缀连接DUT里面的信号 发送的时候,通过.去访问就可以 2、如果是AXI总线,可以直接调用cocotb的库文件 AXIS总线可以包含以下的信号 通过这个类,可以产生一个AXIS的一帧数据 类的实现大概如下 然后也…...
HTML 语义化
目录 HTML 语义化HTML5 新特性HTML 语义化的好处语义化标签的使用场景最佳实践 HTML 语义化 HTML5 新特性 标准答案: 语义化标签: <header>:页头<nav>:导航<main>:主要内容<article>&#x…...
TRS收益互换:跨境资本流动的金融创新工具与系统化解决方案
一、TRS收益互换的本质与业务逻辑 (一)概念解析 TRS(Total Return Swap)收益互换是一种金融衍生工具,指交易双方约定在未来一定期限内,基于特定资产或指数的表现进行现金流交换的协议。其核心特征包括&am…...
NLP学习路线图(二十三):长短期记忆网络(LSTM)
在自然语言处理(NLP)领域,我们时刻面临着处理序列数据的核心挑战。无论是理解句子的结构、分析文本的情感,还是实现语言的翻译,都需要模型能够捕捉词语之间依时序产生的复杂依赖关系。传统的神经网络结构在处理这种序列依赖时显得力不从心,而循环神经网络(RNN) 曾被视为…...
大学生职业发展与就业创业指导教学评价
这里是引用 作为软工2203/2204班的学生,我们非常感谢您在《大学生职业发展与就业创业指导》课程中的悉心教导。这门课程对我们即将面临实习和就业的工科学生来说至关重要,而您认真负责的教学态度,让课程的每一部分都充满了实用价值。 尤其让我…...
html-<abbr> 缩写或首字母缩略词
定义与作用 <abbr> 标签用于表示缩写或首字母缩略词,它可以帮助用户更好地理解缩写的含义,尤其是对于那些不熟悉该缩写的用户。 title 属性的内容提供了缩写的详细说明。当用户将鼠标悬停在缩写上时,会显示一个提示框。 示例&#x…...
基于Java Swing的电子通讯录设计与实现:附系统托盘功能代码详解
JAVASQL电子通讯录带系统托盘 一、系统概述 本电子通讯录系统采用Java Swing开发桌面应用,结合SQLite数据库实现联系人管理功能,并集成系统托盘功能提升用户体验。系统支持联系人的增删改查、分组管理、搜索过滤等功能,同时可以最小化到系统…...
招商蛇口 | 执笔CID,启幕低密生活新境
作为中国城市生长的力量,招商蛇口以“美好生活承载者”为使命,深耕全球111座城市,以央企担当匠造时代理想人居。从深圳湾的开拓基因到西安高新CID的战略落子,招商蛇口始终与城市发展同频共振,以建筑诠释对土地与生活的…...
【网络安全】开源系统getshell漏洞挖掘
审计过程: 在入口文件admin/index.php中: 用户可以通过m,c,a等参数控制加载的文件和方法,在app/system/entrance.php中存在重点代码: 当M_TYPE system并且M_MODULE include时,会设置常量PATH_OWN_FILE为PATH_APP.M_T…...
WPF八大法则:告别模态窗口卡顿
⚙️ 核心问题:阻塞式模态窗口的缺陷 原始代码中ShowDialog()会阻塞UI线程,导致后续逻辑无法执行: var result modalWindow.ShowDialog(); // 线程阻塞 ProcessResult(result); // 必须等待窗口关闭根本问题:…...
保姆级【快数学会Android端“动画“】+ 实现补间动画和逐帧动画!!!
目录 补间动画 1.创建资源文件夹 2.设置文件夹类型 3.创建.xml文件 4.样式设计 5.动画设置 6.动画的实现 内容拓展 7.在原基础上继续添加.xml文件 8.xml代码编写 (1)rotate_anim (2)scale_anim (3)translate_anim 9.MainActivity.java代码汇总 10.效果展示 逐帧…...
