当前位置: 首页 > news >正文

142. Go操作Kafka(confluent-kafka-go库)

文章目录

  • Apache kafka简介
  • 开始使用Apache Kafka
    • 构建生产者
    • 构建消费者
  • 总结

之前已经有两篇文章介绍过 Go如何操作 kafka
28.windows安装kafka,Go操作kafka示例(sarama库)
51.Go操作kafka示例(kafka-go库)

Apache kafka简介

在这里插入图片描述
Apache Kafka是一个开源分布式事件流平台,用于高性能数据管道、流式分析、数据集成和关键任务应用程序。它提供了诸如流数据管道、实时分析、数据集成、事件源和日志聚合等多种能力。

Kafka的用例和能力

  • 流数据管道: Kafka提供了一个分布式发布-订阅消息系统,可以在系统或应用程序之间流式传输数据。它提供了具有数据复制和容错能力的强大队列。

  • 实时分析:Kafka允许使用工具如Kafka StreamsKSQL处理实时数据流,用于构建流式分析和数据处理应用程序。

  • 数据集成 :Kafka可以用来通过在不同数据源和格式之间流式传输数据来集成不同的系统。这使它对流式ETL非常有用。

  • 事件源 : Kafka提供了可以重放的事件时间日志,用于重构应用程序状态,适用于事件源和CQRS模式。

  • 日志聚合 : Kafka通常用于将不同服务器和应用程序的日志聚合到一个中央存储库中。这允许统一访问日志数据。

为什么将Golang与Apache Kafka结合使用

Golang这一高效并发的编程语言与Apache Kafka这一分布式事件流平台结合起来,提供了一个在构建尖端现代应用程序方面表现出色的强大技术栈。这两种技术之间的协同作用源自几个关键优势:

  • 性能 : GolangApache Kafka都提供高性能。Golang快速、高效和轻量级。Kafka为速度而构建,具有高吞吐量和低延迟。它们一起可以处理苛刻的工作负载。

  • 可扩展性 : GolanggoroutinesKafka的分区允许应用程序水平扩展以处理大量数据。Kafka可以轻松扩展生产者和消费者。

  • 并发性 : Golang通过goroutineschannels提供了出色的并发编程能力。Kafka并发传递消息并支持并行性。

  • 可用性 : Kafka的分布式架构使其高度可用和容错。Golang应用可以利用这一点来构建弹性系统。

  • 互操作性 : Kafka有多种语言的客户端,允许Golang应用与多语言环境互动。Kafka还使用二进制TCP协议以提高效率。

  • 现代设计 : KafkaGolang都采用现代设计理念,使它们非常适合云原生和微服务架构。

  • 开发人员体验 : Kafka的客户端库结合Goroutines、channels和接口,使其易于使用。

Kafka和Golang将性能、可扩展性和并发与生产力结合在一起 - 使它们成为构建可扩展的服务、管道和流应用程序的绝佳选择。

开始使用Apache Kafka

在开始使用GolangApache Kafka之前,我们必须确保golangKafka已经安装并在我们的机器上运行。

安装Kafka
28.windows安装kafka,Go操作kafka示例(sarama库)

Apache Kafka的Golang包
您可以使用go get安装confluent-kafka-go包:

go get -u github.com/confluentinc/confluent-kafka-go/kafka

安装后,您可以在Go代码中导入并使用confluent-kafka-go

package mainimport ("fmt""github.com/confluentinc/confluent-kafka-go/kafka"
)func main() {p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})if err != nil {fmt.Printf("创建生产者失败: %s\n", err)return}// 生产消息到主题,处理交付报告等。// 使用后记得关闭生产者defer p.Close()
}

构建生产者

Kafka生产者是Apache Kafka生态系统中的一个关键组成部分,作为一个客户端应用程序,负责向Kafka集群发布(写入)事件。这一部分提供了关于Kafka生产者的全面概述,以及针对调整其行为的配置设置的初步探讨。

下面是一个Golang应用程序的示例,它生产数据并将其发布到Kafka的具体topic。它还说明了如何在Golang中为Kafka消息序列化数据,并演示了如何处理错误和重试。

package mainimport ("fmt""github.com/confluentinc/confluent-kafka-go/kafka"
)const (kafkaBroker = "localhost:9092"topic       = "test-topic"
)type Messagestruct {Key   string `json:"key"`Value string `json:"value"`
}func main() {// 创建一个新的Kafka生产者p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": kafkaBroker})if err != nil {fmt.Printf("创建生产者失败: %s\n", err)return}defer p.Close()// 定义要发送的消息message := Message{Key:   "example_key",Value: "Hello, Kafka!",}// 序列化消息serializedMessage, err := serializeMessage(message)if err != nil {fmt.Printf("消息序列化失败: %s\n", err)return}// 将消息生产到Kafka主题err = produceMessage(p, topic, serializedMessage)if err != nil {fmt.Printf("消息生产失败: %s\n", err)return}fmt.Println("消息成功生产!")
}func serializeMessage(message Message) ([]byte, error) {// 将消息结构体序列化为JSONserialized, err := json.Marshal(message)if err != nil {return nil, fmt.Errorf("消息序列化失败: %w", err)}return serialized, nil
}func produceMessage(p *kafka.Producer, topic string, message []byte) error {// 创建一个新的要生产的Kafka消息kafkaMessage := &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},Value:          message,}// 生产Kafka消息deliveryChan := make(chan kafka.Event)err := p.Produce(kafkaMessage, deliveryChan)if err != nil {return fmt.Errorf("消息生产失败: %w", err)}// 等待交付报告或错误e := <-deliveryChanm := e.(*kafka.Message)// 检查交付错误,即生成者方确保发送到Broker的消息不丢失// 但可能重复发送,如实际发成功了,但是Broker的ACK返回给生产者时出现了网络Error// 从而重试后,导致消息重复发送,这时候需要下游做好幂等处理if m.TopicPartition.Error != nil {return fmt.Errorf("交付失败: %s", m.TopicPartition.Error)}// 关闭交付频道close(deliveryChan)return nil
}

步骤解释:

  1. 创建一个Kafka生产者。

  2. 使用json.Marshal函数将自定义消息结构体(Message)序列化为JSON

  3. 使用生产者将序列化的消息生产到Kafka topic

  4. 使用交付报告和错误检查处理错误和重试。

确保将localhost:9092替换为您的Kafka代理地址,将test-topic替换为所需的主题名称。此外,您可能需要处理更复杂的错误场景并根据您的具体需求实现重试逻辑

构建消费者

Kafka消费者就像小型事件处理器,它们获取并消化数据流。它们订阅主题并消费任何新到达的消息,处理每一个消息。我们将探讨这些消费者的内部工作原理和调整其性能的配置。

下面是一个Golang应用程序的示例,它从Kafka主题消费消息。如下代码包括了如何处理消费到的消息的说明,以及对不同消费模式(如单个消费者和消费者组)的讨论。

package mainimport ("fmt""os""os/signal""github.com/confluentinc/confluent-kafka-go/kafka"
)const (kafkaBroker = "localhost:9092"topic       = "test-topic"groupID     = "test-group"
)func main() {// 创建一个新的Kafka消费者c, err := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers":  kafkaBroker,"group.id":           groupID, // 消费者组标识"auto.offset.reset":  "earliest", // 从头开始消费})if err != nil {fmt.Printf("创建消费者失败: %s\n", err)return}defer c.Close()// 订阅Kafka主题err = c.SubscribeTopics([]string{topic}, nil)if err != nil {fmt.Printf("订阅主题失败: %s\n", err)return}// 设置一个通道来处理操作系统信号,以便优雅地关闭sigchan := make(chan os.Signal, 1)signal.Notify(sigchan, os.Interrupt)// 开始消费消息run := truefor run == true {select {case sig := <-sigchan:fmt.Printf("接收到信号 %v: 正在终止\n", sig)run = falsedefault:// 轮询Kafka消息,1次最多拉取100条消息ev := c.Poll(100) if ev == nil {continue}switch e := ev.(type) {case *kafka.Message:// 处理消费的消息fmt.Printf("从主题 %s 收到消息: %s\n", *e.TopicPartition.Topic, string(e.Value))case kafka.Error:// 处理Kafka错误fmt.Printf("错误: %v\n", e)}}}
}

步骤解释

  1. 创建一个Kafka消费者。

  2. 订阅一个Kafka主题。

  3. 设置一个通道来处理操作系统信号(如SIGINT)以优雅地关闭。

  4. 开始从订阅的Topic消费消息。

  5. 处理消费的消息以及Kafka错误。

不同的消费模式:

  • 单个消费者:在这种模式下,单个消费者实例从主题的一个或多个分区读取消息。当您只需要一个消费者应用程序实例来处理来自Topic的所有消息时,这很有用。

  • 消费者组:消费者组允许您通过将消息处理分布到多个消费者实例来扩展消费,以实现扩展。每个消费者组可以有多个消费者,组内的每个消费者从一部分分区读取消息。这使得消息的并行处理成为可能,提供了容错能力和高吞吐量。

在提供的示例中,group.id配置设置用于指定消费者组ID。这允许消费者应用程序的多个实例在消费者组中一起工作,从Kafka Topic消费消息。

总结

总之,Apache Kafka作为构建实时数据管道和流应用程序的强大解决方案,得益于其分布式、可扩展和容错的架构。当与Golang结合时,它形成了一个在性能、可扩展性和并发方面表现出色的强大技术栈,非常适合现代应用程序。通过利用Kafka的功能和Golang的优势,开发人员可以构建出具有弹性和高性能的服务、管道和流应用程序,这些应用程序可以轻松扩展以满足当今数据驱动世界的需求。无论是处理实时分析、集成不同的系统还是聚合日志,KafkaGolang提供了一个强势组合,使开发人员能够轻松构建创新和可扩展的解决方案。

相关文章:

142. Go操作Kafka(confluent-kafka-go库)

文章目录 Apache kafka简介开始使用Apache Kafka构建生产者构建消费者 总结 之前已经有两篇文章介绍过 Go如何操作 kafka 28.windows安装kafka&#xff0c;Go操作kafka示例&#xff08;sarama库&#xff09; 51.Go操作kafka示例&#xff08;kafka-go库&#xff09; Apache ka…...

spring boot(学习笔记第十九课)

spring boot(学习笔记第十九课) Spring boot的batch框架&#xff0c;以及Swagger3(OpenAPI)整合 学习内容&#xff1a; Spring boot的batch框架Spring boot的Swagger3&#xff08;OpenAPI&#xff09;整合 1. Spring boot batch框架 Spring Batch是什么 Spring Batch 是一个…...

docker安装 redis 并且加密开启SSL/TLS通道

拉取镜像 docker pull registry.cn-hangzhou.aliyuncs.com/qiluo-images/redis:latest docker tag registry.cn-hangzhou.aliyuncs.com/qiluo-images/redis:latest redis:latest要在 Docker 容器中启动 Redis 并开启 SSL/TLS 加密&#xff0c;需按照以下步骤修改启动命令和配置…...

什么是ARM架构?什么是X86架构?两者的区别是什么?

一、什么是ARM架构 &#xff08;一&#xff09;起源于发展 ARM 架构由英国剑桥的 Acorn 计算机公司开发。因市场无合适产品&#xff0c;Acorn 自行设计出第一款微处理器&#xff0c;命名为 ARM。此后 ARM 架构不断发展&#xff0c;1990 年为与苹果合作成立 ARM 公司&#xff0…...

【vscode】vscode paste image插件设置

本文首发于 ❄️慕雪的寒舍 vscode编辑md文件的时候&#xff0c;如果想插入图片&#xff0c;自带的粘贴只会粘贴到当前目录下&#xff0c;也没有文件重命名&#xff0c;很不友好。 在扩展商店里面有mushan的Paste Image插件&#xff0c;相比自带的&#xff0c;更加友好一点。但…...

自定义string类

#include <iostream> #include <string> int main() { std::string str "Hello, World!"; // 使用 c_str() 将 std::string 转换为 C 风格字符串&#xff0c;并传递给 printf printf("The string is: %s\n", str.c_str()); // 尝试修改…...

Python | Leetcode Python题解之第387题字符串中的第一个唯一字符

题目&#xff1a; 题解&#xff1a; class Solution:def firstUniqChar(self, s: str) -> int:position dict()q collections.deque()n len(s)for i, ch in enumerate(s):if ch not in position:position[ch] iq.append((s[i], i))else:position[ch] -1while q and po…...

RocketMQ 消费时序列化报错问题分析及解决

问题背景 在2024年3月7日&#xff0c;系统消费 RocketMQ 消息时出现了序列化报错&#xff0c;错误信息显示为&#xff1a; java.io.InvalidClassException: com.xxx.xxx.bean.mg.GoodsChangeLogMessage; local class incompatible: stream classdesc serialVersionUID... 这是…...

全能与专精:探索未来AI模型的发展趋势与市场潜力

文章目录 每日一句正能量前言AI模型的全面评估和比较AI模型的专精化和可扩展性AI模型的合理使用和道德规范后记 每日一句正能量 一个人&#xff0c;如果没有经受过投资失败的痛楚&#xff0c;又怎么会看到绝望之后的海阔天空。很多时候&#xff0c;经历了人生中最艰难的事&…...

Python深度学习:【开源数据集系列】ImageNet数据集

ImageNet 是一个大规模的视觉数据集,是计算机视觉领域最重要的基准数据集之一。该数据集由普林斯顿大学和斯坦福大学的研究人员发起,于 2009 年推出。ImageNet 是用于物体分类、目标检测、图像分割、姿势估计等多种任务的通用数据集,尤其在深度学习和计算机视觉的突破性研究…...

微信小程序手写签名

微信小程序手写签名组件 该组件基于signature_pad封装&#xff0c;signature_pad本身是web端的插件&#xff0c;此处将插件代码修改为小程序端可用。 signature_pad.js /*!* Signature Pad v5.0.3 | https://github.com/szimek/signature_pad* (c) 2024 Szymon Nowak | Releas…...

Javascript 使用中点查找矩形的角(Find Corners of Rectangle using mid points)

考虑一个矩形 ABCD&#xff0c;我们给出了边 AD 和 BC 中点&#xff08;分别为 p 和 q&#xff09;的坐标以及它们的长度 L&#xff08;AD BC L&#xff09;。现在给定参数&#xff0c;我们需要打印 4 个点 A、B、C 和 D 的坐标。 例子&#xff1a; 输入&#xff1a;p (1,…...

【困难】 猿人学web第一届 第18题 jsvmp 洞察先机

文章目录 数据接口分析还原加密参数插桩调试分析日志插桩补充 python 代码 数据接口分析 数据接口 https://match.yuanrenxue.cn/match/18data 请求参数 {page: 页码, t: 时间戳, v: 加密值} 请求第一页不需要携带 t, v 参数 cookie 只需要携带 sessionid 只要 还原加密字段…...

IDEA Maven 源修改为国内阿里云镜像的正确方式

&#x1f49d;&#x1f49d;&#x1f49d;欢迎莅临我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐&#xff1a;「storm…...

OpenCV 旋转矩形边界

边界矩形是用最小面积绘制的&#xff0c;所以它也考虑了旋转。使用的函数是**cv.minAreaRect**()。 import cv2 import numpy as npimgcv2.imread(rD:\PythonProject\thunder.jpg) img1cv2.cvtColor(img,cv2.COLOR_BGR2GRAY) print(img.dtype) ret,threshcv2.threshold(img1,1…...

人车防撞系统安全生产方案

根据《市场监管总局关于2021~2023年全国特种设备安全状况的通告》数据显示&#xff1a;2023年&#xff1a;全国共发生特种设备事故和相关事故71起&#xff0c;其中死亡69人。包含叉车在内的场(厂)内专用机动车辆事故29起、死亡28人&#xff0c;占事故总数的40.85%、死亡人数的4…...

开放式耳机哪个牌子好?长文传授6招秘籍,彻底远离坑货!

​大家好&#xff0c;作为一位专注于评测各类数码产品的博主&#xff0c;今天我特别推荐开放式耳机作为我们日常的首选。这种耳机以其独特的设计&#xff0c;避免了传统耳机长时间佩戴可能带来的不适和感染风险。开放式耳机佩戴简便且稳固&#xff0c;尤其适合热爱跑步和运动的…...

vue2和vue3双向绑定的原理

Vue.js 的双向绑定是 Vue 框架的核心特性之一&#xff0c;它允许数据和视图之间保持同步。虽然 Vue 2 和 Vue 3 都实现了双向绑定&#xff0c;但它们在实现细节上有所不同。 Vue 2 双向绑定的原理 在 Vue 2 中&#xff0c;双向绑定主要依赖于 Object.defineProperty 和观察者…...

别为大文件烦恼!mp4文件太大怎么变小?3个管用方法

你是否曾经遇到过mp4视频文件过大的困扰&#xff1f;每当想要分享或存储mp4文件时&#xff0c;巨大的文件就成了阻碍。明明感觉感觉没占用多少空间&#xff0c;但是设备却常常出现空间过满警告。 没多少空间的设备真是让人大为恼火&#xff0c;没人想多花一份钱买设备。那么只…...

cocotb的接收和发送逻辑,还是没有弄明白

发送有两种方式 1、定义这样的发 通过前缀连接DUT里面的信号 发送的时候&#xff0c;通过.去访问就可以 2、如果是AXI总线&#xff0c;可以直接调用cocotb的库文件 AXIS总线可以包含以下的信号 通过这个类&#xff0c;可以产生一个AXIS的一帧数据 类的实现大概如下 然后也…...

【Axure高保真原型】引导弹窗

今天和大家中分享引导弹窗的原型模板&#xff0c;载入页面后&#xff0c;会显示引导弹窗&#xff0c;适用于引导用户使用页面&#xff0c;点击完成后&#xff0c;会显示下一个引导弹窗&#xff0c;直至最后一个引导弹窗完成后进入首页。具体效果可以点击下方视频观看或打开下方…...

stm32G473的flash模式是单bank还是双bank?

今天突然有人stm32G473的flash模式是单bank还是双bank&#xff1f;由于时间太久&#xff0c;我真忘记了。搜搜发现&#xff0c;还真有人和我一样。见下面的链接&#xff1a;https://shequ.stmicroelectronics.cn/forum.php?modviewthread&tid644563 根据STM32G4系列参考手…...

51c自动驾驶~合集58

我自己的原文哦~ https://blog.51cto.com/whaosoft/13967107 #CCA-Attention 全局池化局部保留&#xff0c;CCA-Attention为LLM长文本建模带来突破性进展 琶洲实验室、华南理工大学联合推出关键上下文感知注意力机制&#xff08;CCA-Attention&#xff09;&#xff0c;…...

FFmpeg 低延迟同屏方案

引言 在实时互动需求激增的当下&#xff0c;无论是在线教育中的师生同屏演示、远程办公的屏幕共享协作&#xff0c;还是游戏直播的画面实时传输&#xff0c;低延迟同屏已成为保障用户体验的核心指标。FFmpeg 作为一款功能强大的多媒体框架&#xff0c;凭借其灵活的编解码、数据…...

【机器视觉】单目测距——运动结构恢复

ps&#xff1a;图是随便找的&#xff0c;为了凑个封面 前言 在前面对光流法进行进一步改进&#xff0c;希望将2D光流推广至3D场景流时&#xff0c;发现2D转3D过程中存在尺度歧义问题&#xff0c;需要补全摄像头拍摄图像中缺失的深度信息&#xff0c;否则解空间不收敛&#xf…...

第一篇:Agent2Agent (A2A) 协议——协作式人工智能的黎明

AI 领域的快速发展正在催生一个新时代&#xff0c;智能代理&#xff08;agents&#xff09;不再是孤立的个体&#xff0c;而是能够像一个数字团队一样协作。然而&#xff0c;当前 AI 生态系统的碎片化阻碍了这一愿景的实现&#xff0c;导致了“AI 巴别塔问题”——不同代理之间…...

2025盘古石杯决赛【手机取证】

前言 第三届盘古石杯国际电子数据取证大赛决赛 最后一题没有解出来&#xff0c;实在找不到&#xff0c;希望有大佬教一下我。 还有就会议时间&#xff0c;我感觉不是图片时间&#xff0c;因为在电脑看到是其他时间用老会议系统开的会。 手机取证 1、分析鸿蒙手机检材&#x…...

Spring AI与Spring Modulith核心技术解析

Spring AI核心架构解析 Spring AI&#xff08;https://spring.io/projects/spring-ai&#xff09;作为Spring生态中的AI集成框架&#xff0c;其核心设计理念是通过模块化架构降低AI应用的开发复杂度。与Python生态中的LangChain/LlamaIndex等工具类似&#xff0c;但特别为多语…...

10-Oracle 23 ai Vector Search 概述和参数

一、Oracle AI Vector Search 概述 企业和个人都在尝试各种AI&#xff0c;使用客户端或是内部自己搭建集成大模型的终端&#xff0c;加速与大型语言模型&#xff08;LLM&#xff09;的结合&#xff0c;同时使用检索增强生成&#xff08;Retrieval Augmented Generation &#…...

人工智能(大型语言模型 LLMs)对不同学科的影响以及由此产生的新学习方式

今天是关于AI如何在教学中增强学生的学习体验&#xff0c;我把重要信息标红了。人文学科的价值被低估了 ⬇️ 转型与必要性 人工智能正在深刻地改变教育&#xff0c;这并非炒作&#xff0c;而是已经发生的巨大变革。教育机构和教育者不能忽视它&#xff0c;试图简单地禁止学生使…...