142. Go操作Kafka(confluent-kafka-go库)
文章目录
- Apache kafka简介
- 开始使用Apache Kafka
- 构建生产者
- 构建消费者
- 总结
之前已经有两篇文章介绍过
Go如何操作
kafka
28.windows安装kafka,Go操作kafka示例(sarama库)
51.Go操作kafka示例(kafka-go库)
Apache kafka简介

Apache Kafka是一个开源分布式事件流平台,用于高性能数据管道、流式分析、数据集成和关键任务应用程序。它提供了诸如流数据管道、实时分析、数据集成、事件源和日志聚合等多种能力。
Kafka的用例和能力
-
流数据管道:
Kafka提供了一个分布式发布-订阅消息系统,可以在系统或应用程序之间流式传输数据。它提供了具有数据复制和容错能力的强大队列。 -
实时分析:
Kafka允许使用工具如Kafka Streams和KSQL处理实时数据流,用于构建流式分析和数据处理应用程序。 -
数据集成 :
Kafka可以用来通过在不同数据源和格式之间流式传输数据来集成不同的系统。这使它对流式ETL非常有用。 -
事件源 :
Kafka提供了可以重放的事件时间日志,用于重构应用程序状态,适用于事件源和CQRS模式。 -
日志聚合 :
Kafka通常用于将不同服务器和应用程序的日志聚合到一个中央存储库中。这允许统一访问日志数据。
为什么将Golang与Apache Kafka结合使用
将Golang这一高效并发的编程语言与Apache Kafka这一分布式事件流平台结合起来,提供了一个在构建尖端现代应用程序方面表现出色的强大技术栈。这两种技术之间的协同作用源自几个关键优势:
-
性能 :
Golang和Apache Kafka都提供高性能。Golang快速、高效和轻量级。Kafka为速度而构建,具有高吞吐量和低延迟。它们一起可以处理苛刻的工作负载。 -
可扩展性 :
Golang的goroutines和Kafka的分区允许应用程序水平扩展以处理大量数据。Kafka可以轻松扩展生产者和消费者。 -
并发性 :
Golang通过goroutines和channels提供了出色的并发编程能力。Kafka并发传递消息并支持并行性。 -
可用性 :
Kafka的分布式架构使其高度可用和容错。Golang应用可以利用这一点来构建弹性系统。 -
互操作性 :
Kafka有多种语言的客户端,允许Golang应用与多语言环境互动。Kafka还使用二进制TCP协议以提高效率。 -
现代设计 :
Kafka和Golang都采用现代设计理念,使它们非常适合云原生和微服务架构。 -
开发人员体验 :
Kafka的客户端库结合Goroutines、channels和接口,使其易于使用。
Kafka和Golang将性能、可扩展性和并发与生产力结合在一起 - 使它们成为构建可扩展的服务、管道和流应用程序的绝佳选择。
开始使用Apache Kafka
在开始使用Golang和Apache Kafka之前,我们必须确保golang和Kafka已经安装并在我们的机器上运行。
安装Kafka
28.windows安装kafka,Go操作kafka示例(sarama库)
Apache Kafka的Golang包
您可以使用go get安装confluent-kafka-go包:
go get -u github.com/confluentinc/confluent-kafka-go/kafka
安装后,您可以在Go代码中导入并使用confluent-kafka-go。
package mainimport ("fmt""github.com/confluentinc/confluent-kafka-go/kafka"
)func main() {p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})if err != nil {fmt.Printf("创建生产者失败: %s\n", err)return}// 生产消息到主题,处理交付报告等。// 使用后记得关闭生产者defer p.Close()
}
构建生产者
Kafka生产者是Apache Kafka生态系统中的一个关键组成部分,作为一个客户端应用程序,负责向Kafka集群发布(写入)事件。这一部分提供了关于Kafka生产者的全面概述,以及针对调整其行为的配置设置的初步探讨。
下面是一个Golang应用程序的示例,它生产数据并将其发布到Kafka的具体topic。它还说明了如何在Golang中为Kafka消息序列化数据,并演示了如何处理错误和重试。
package mainimport ("fmt""github.com/confluentinc/confluent-kafka-go/kafka"
)const (kafkaBroker = "localhost:9092"topic = "test-topic"
)type Messagestruct {Key string `json:"key"`Value string `json:"value"`
}func main() {// 创建一个新的Kafka生产者p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": kafkaBroker})if err != nil {fmt.Printf("创建生产者失败: %s\n", err)return}defer p.Close()// 定义要发送的消息message := Message{Key: "example_key",Value: "Hello, Kafka!",}// 序列化消息serializedMessage, err := serializeMessage(message)if err != nil {fmt.Printf("消息序列化失败: %s\n", err)return}// 将消息生产到Kafka主题err = produceMessage(p, topic, serializedMessage)if err != nil {fmt.Printf("消息生产失败: %s\n", err)return}fmt.Println("消息成功生产!")
}func serializeMessage(message Message) ([]byte, error) {// 将消息结构体序列化为JSONserialized, err := json.Marshal(message)if err != nil {return nil, fmt.Errorf("消息序列化失败: %w", err)}return serialized, nil
}func produceMessage(p *kafka.Producer, topic string, message []byte) error {// 创建一个新的要生产的Kafka消息kafkaMessage := &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},Value: message,}// 生产Kafka消息deliveryChan := make(chan kafka.Event)err := p.Produce(kafkaMessage, deliveryChan)if err != nil {return fmt.Errorf("消息生产失败: %w", err)}// 等待交付报告或错误e := <-deliveryChanm := e.(*kafka.Message)// 检查交付错误,即生成者方确保发送到Broker的消息不丢失// 但可能重复发送,如实际发成功了,但是Broker的ACK返回给生产者时出现了网络Error// 从而重试后,导致消息重复发送,这时候需要下游做好幂等处理if m.TopicPartition.Error != nil {return fmt.Errorf("交付失败: %s", m.TopicPartition.Error)}// 关闭交付频道close(deliveryChan)return nil
}
步骤解释:
-
创建一个
Kafka生产者。 -
使用
json.Marshal函数将自定义消息结构体(Message)序列化为JSON。 -
使用生产者将序列化的消息生产到
Kafka topic。 -
使用交付报告和错误检查处理错误和重试。
确保将localhost:9092替换为您的Kafka代理地址,将test-topic替换为所需的主题名称。此外,您可能需要处理更复杂的错误场景并根据您的具体需求实现重试逻辑。
构建消费者
Kafka消费者就像小型事件处理器,它们获取并消化数据流。它们订阅主题并消费任何新到达的消息,处理每一个消息。我们将探讨这些消费者的内部工作原理和调整其性能的配置。
下面是一个Golang应用程序的示例,它从Kafka主题消费消息。如下代码包括了如何处理消费到的消息的说明,以及对不同消费模式(如单个消费者和消费者组)的讨论。
package mainimport ("fmt""os""os/signal""github.com/confluentinc/confluent-kafka-go/kafka"
)const (kafkaBroker = "localhost:9092"topic = "test-topic"groupID = "test-group"
)func main() {// 创建一个新的Kafka消费者c, err := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers": kafkaBroker,"group.id": groupID, // 消费者组标识"auto.offset.reset": "earliest", // 从头开始消费})if err != nil {fmt.Printf("创建消费者失败: %s\n", err)return}defer c.Close()// 订阅Kafka主题err = c.SubscribeTopics([]string{topic}, nil)if err != nil {fmt.Printf("订阅主题失败: %s\n", err)return}// 设置一个通道来处理操作系统信号,以便优雅地关闭sigchan := make(chan os.Signal, 1)signal.Notify(sigchan, os.Interrupt)// 开始消费消息run := truefor run == true {select {case sig := <-sigchan:fmt.Printf("接收到信号 %v: 正在终止\n", sig)run = falsedefault:// 轮询Kafka消息,1次最多拉取100条消息ev := c.Poll(100) if ev == nil {continue}switch e := ev.(type) {case *kafka.Message:// 处理消费的消息fmt.Printf("从主题 %s 收到消息: %s\n", *e.TopicPartition.Topic, string(e.Value))case kafka.Error:// 处理Kafka错误fmt.Printf("错误: %v\n", e)}}}
}
步骤解释:
-
创建一个
Kafka消费者。 -
订阅一个
Kafka主题。 -
设置一个通道来处理操作系统信号(如
SIGINT)以优雅地关闭。 -
开始从订阅的
Topic消费消息。 -
处理消费的消息以及
Kafka错误。
不同的消费模式:
-
单个消费者:在这种模式下,单个消费者实例从主题的一个或多个分区读取消息。当您只需要一个消费者应用程序实例来处理来自
Topic的所有消息时,这很有用。 -
消费者组:消费者组允许您通过将消息处理分布到多个消费者实例来扩展消费,以实现扩展。每个消费者组可以有多个消费者,组内的每个消费者从一部分分区读取消息。这使得消息的并行处理成为可能,提供了容错能力和高吞吐量。
在提供的示例中,group.id配置设置用于指定消费者组ID。这允许消费者应用程序的多个实例在消费者组中一起工作,从Kafka Topic消费消息。
总结
总之,Apache Kafka作为构建实时数据管道和流应用程序的强大解决方案,得益于其分布式、可扩展和容错的架构。当与Golang结合时,它形成了一个在性能、可扩展性和并发方面表现出色的强大技术栈,非常适合现代应用程序。通过利用Kafka的功能和Golang的优势,开发人员可以构建出具有弹性和高性能的服务、管道和流应用程序,这些应用程序可以轻松扩展以满足当今数据驱动世界的需求。无论是处理实时分析、集成不同的系统还是聚合日志,Kafka和Golang提供了一个强势组合,使开发人员能够轻松构建创新和可扩展的解决方案。
相关文章:
142. Go操作Kafka(confluent-kafka-go库)
文章目录 Apache kafka简介开始使用Apache Kafka构建生产者构建消费者 总结 之前已经有两篇文章介绍过 Go如何操作 kafka 28.windows安装kafka,Go操作kafka示例(sarama库) 51.Go操作kafka示例(kafka-go库) Apache ka…...
spring boot(学习笔记第十九课)
spring boot(学习笔记第十九课) Spring boot的batch框架,以及Swagger3(OpenAPI)整合 学习内容: Spring boot的batch框架Spring boot的Swagger3(OpenAPI)整合 1. Spring boot batch框架 Spring Batch是什么 Spring Batch 是一个…...
docker安装 redis 并且加密开启SSL/TLS通道
拉取镜像 docker pull registry.cn-hangzhou.aliyuncs.com/qiluo-images/redis:latest docker tag registry.cn-hangzhou.aliyuncs.com/qiluo-images/redis:latest redis:latest要在 Docker 容器中启动 Redis 并开启 SSL/TLS 加密,需按照以下步骤修改启动命令和配置…...
什么是ARM架构?什么是X86架构?两者的区别是什么?
一、什么是ARM架构 (一)起源于发展 ARM 架构由英国剑桥的 Acorn 计算机公司开发。因市场无合适产品,Acorn 自行设计出第一款微处理器,命名为 ARM。此后 ARM 架构不断发展,1990 年为与苹果合作成立 ARM 公司࿰…...
【vscode】vscode paste image插件设置
本文首发于 ❄️慕雪的寒舍 vscode编辑md文件的时候,如果想插入图片,自带的粘贴只会粘贴到当前目录下,也没有文件重命名,很不友好。 在扩展商店里面有mushan的Paste Image插件,相比自带的,更加友好一点。但…...
自定义string类
#include <iostream> #include <string> int main() { std::string str "Hello, World!"; // 使用 c_str() 将 std::string 转换为 C 风格字符串,并传递给 printf printf("The string is: %s\n", str.c_str()); // 尝试修改…...
Python | Leetcode Python题解之第387题字符串中的第一个唯一字符
题目: 题解: class Solution:def firstUniqChar(self, s: str) -> int:position dict()q collections.deque()n len(s)for i, ch in enumerate(s):if ch not in position:position[ch] iq.append((s[i], i))else:position[ch] -1while q and po…...
RocketMQ 消费时序列化报错问题分析及解决
问题背景 在2024年3月7日,系统消费 RocketMQ 消息时出现了序列化报错,错误信息显示为: java.io.InvalidClassException: com.xxx.xxx.bean.mg.GoodsChangeLogMessage; local class incompatible: stream classdesc serialVersionUID... 这是…...
全能与专精:探索未来AI模型的发展趋势与市场潜力
文章目录 每日一句正能量前言AI模型的全面评估和比较AI模型的专精化和可扩展性AI模型的合理使用和道德规范后记 每日一句正能量 一个人,如果没有经受过投资失败的痛楚,又怎么会看到绝望之后的海阔天空。很多时候,经历了人生中最艰难的事&…...
Python深度学习:【开源数据集系列】ImageNet数据集
ImageNet 是一个大规模的视觉数据集,是计算机视觉领域最重要的基准数据集之一。该数据集由普林斯顿大学和斯坦福大学的研究人员发起,于 2009 年推出。ImageNet 是用于物体分类、目标检测、图像分割、姿势估计等多种任务的通用数据集,尤其在深度学习和计算机视觉的突破性研究…...
微信小程序手写签名
微信小程序手写签名组件 该组件基于signature_pad封装,signature_pad本身是web端的插件,此处将插件代码修改为小程序端可用。 signature_pad.js /*!* Signature Pad v5.0.3 | https://github.com/szimek/signature_pad* (c) 2024 Szymon Nowak | Releas…...
Javascript 使用中点查找矩形的角(Find Corners of Rectangle using mid points)
考虑一个矩形 ABCD,我们给出了边 AD 和 BC 中点(分别为 p 和 q)的坐标以及它们的长度 L(AD BC L)。现在给定参数,我们需要打印 4 个点 A、B、C 和 D 的坐标。 例子: 输入:p (1,…...
【困难】 猿人学web第一届 第18题 jsvmp 洞察先机
文章目录 数据接口分析还原加密参数插桩调试分析日志插桩补充 python 代码 数据接口分析 数据接口 https://match.yuanrenxue.cn/match/18data 请求参数 {page: 页码, t: 时间戳, v: 加密值} 请求第一页不需要携带 t, v 参数 cookie 只需要携带 sessionid 只要 还原加密字段…...
IDEA Maven 源修改为国内阿里云镜像的正确方式
💝💝💝欢迎莅临我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:「storm…...
OpenCV 旋转矩形边界
边界矩形是用最小面积绘制的,所以它也考虑了旋转。使用的函数是**cv.minAreaRect**()。 import cv2 import numpy as npimgcv2.imread(rD:\PythonProject\thunder.jpg) img1cv2.cvtColor(img,cv2.COLOR_BGR2GRAY) print(img.dtype) ret,threshcv2.threshold(img1,1…...
人车防撞系统安全生产方案
根据《市场监管总局关于2021~2023年全国特种设备安全状况的通告》数据显示:2023年:全国共发生特种设备事故和相关事故71起,其中死亡69人。包含叉车在内的场(厂)内专用机动车辆事故29起、死亡28人,占事故总数的40.85%、死亡人数的4…...
开放式耳机哪个牌子好?长文传授6招秘籍,彻底远离坑货!
大家好,作为一位专注于评测各类数码产品的博主,今天我特别推荐开放式耳机作为我们日常的首选。这种耳机以其独特的设计,避免了传统耳机长时间佩戴可能带来的不适和感染风险。开放式耳机佩戴简便且稳固,尤其适合热爱跑步和运动的…...
vue2和vue3双向绑定的原理
Vue.js 的双向绑定是 Vue 框架的核心特性之一,它允许数据和视图之间保持同步。虽然 Vue 2 和 Vue 3 都实现了双向绑定,但它们在实现细节上有所不同。 Vue 2 双向绑定的原理 在 Vue 2 中,双向绑定主要依赖于 Object.defineProperty 和观察者…...
别为大文件烦恼!mp4文件太大怎么变小?3个管用方法
你是否曾经遇到过mp4视频文件过大的困扰?每当想要分享或存储mp4文件时,巨大的文件就成了阻碍。明明感觉感觉没占用多少空间,但是设备却常常出现空间过满警告。 没多少空间的设备真是让人大为恼火,没人想多花一份钱买设备。那么只…...
cocotb的接收和发送逻辑,还是没有弄明白
发送有两种方式 1、定义这样的发 通过前缀连接DUT里面的信号 发送的时候,通过.去访问就可以 2、如果是AXI总线,可以直接调用cocotb的库文件 AXIS总线可以包含以下的信号 通过这个类,可以产生一个AXIS的一帧数据 类的实现大概如下 然后也…...
Python拉取视频流的性能优化实战
一、背景与挑战在安防监控、直播推流、视频分析等场景中,我们经常需要使用Python拉取网络视频流(RTSP、HLS、HTTP-FLV等)。然而Python并非以高性能著称,面对高码率、多路视频流时,容易遇到:延迟累积&#x…...
Mojo+Python混合编程避坑手册:5个致命安装错误及对应修复命令(附官方源码验证)
第一章:MojoPython混合编程避坑手册:5个致命安装错误及对应修复命令(附官方源码验证) Mojo 是 Modular 官方推出的高性能编程语言,原生兼容 Python 语法,但其工具链对环境依赖极为敏感。初学者在配置 MojoP…...
Curl命令行工具:从基础到高级的全面指南
1. Curl 命令行工具概述curl(Client for URLs)是一个功能强大的命令行工具,用于与各种服务器进行数据传输。作为一名长期与服务器打交道的开发者,我可以负责任地说,curl是每个技术人员工具箱中不可或缺的利器。它支持包…...
RAG——RAG向量数据库原理与常用向量库
目录 一、向量数据库的分类二、为什么需要向量数据库 2.1、什么场景下该选择什么样的数据库2.2、向量数据库的主要优势 三、向量数据库是如何工作的 3.1、向量数据库的核心3.2、 向量数据库的索引结构3.3、向量数据库的搜索机制3.4、向量数据库的工作流程3.5、向量数据库的主要…...
嵌入式C语言设计模式实践:观察者与责任链模式
1. 嵌入式软件开发中的设计模式应用背景在传统认知中,嵌入式系统开发往往与"资源受限"、"底层硬件"、"效率优先"等标签紧密关联。早期的嵌入式设备功能单一,业务逻辑简单,开发者更关注代码的执行效率和硬件资源…...
1.C语言常见概念
目录1.C语言是什么?2.C语言的历史3.编译器的选择-VS2022正文1.C语言是什么?人和计算机是如何交流的?是使用计算机语言。就如同人与人交流使用的自然语言。目前的计算机语言有上千种,C语言就是其中一种,除此之外还有C/J…...
别再只靠瓦片等级了!用Cesium精准控制地图缩放的自定义比例尺方案
突破瓦片等级限制:Cesium动态比例尺的工程实践与业务集成 在三维地理信息系统的开发中,地图缩放控制一直是个既基础又关键的课题。传统依赖预定义瓦片等级的做法,就像用固定档位的变速箱驾驶越野车——虽然简单直接,但面对复杂地形…...
养殖场环境控制系统:远程控制,足不出户管全场
一、应用背景 当前我国畜禽养殖正从传统散户养殖向规模化、集约化转型,而环境因素(温湿度、有害气体、光照等)是影响畜禽生长发育、繁殖效率、疫病防控的核心要素。据行业数据显示,2023年全球智慧农业市场规模达2200亿美元,畜牧养殖环境监控系…...
中文Python游戏开发避坑指南:植物大战僵尸开发中的5个常见问题及解决方案
Python游戏开发实战:植物大战僵尸复刻中的5个关键技术挑战 在游戏开发领域,Python凭借其简洁语法和丰富的库支持,成为许多独立开发者的首选语言。植物大战僵尸作为一款经典的塔防游戏,其核心玩法看似简单,但在实际开发…...
突破网盘限速壁垒:本地化直链解析工具的全方位解决方案
突破网盘限速壁垒:本地化直链解析工具的全方位解决方案 【免费下载链接】Online-disk-direct-link-download-assistant 一个基于 JavaScript 的网盘文件下载地址获取工具。基于【网盘直链下载助手】修改 ,支持 百度网盘 / 阿里云盘 / 中国移动云盘 / 天翼…...
