当前位置: 首页 > news >正文

142. Go操作Kafka(confluent-kafka-go库)

文章目录

  • Apache kafka简介
  • 开始使用Apache Kafka
    • 构建生产者
    • 构建消费者
  • 总结

之前已经有两篇文章介绍过 Go如何操作 kafka
28.windows安装kafka,Go操作kafka示例(sarama库)
51.Go操作kafka示例(kafka-go库)

Apache kafka简介

在这里插入图片描述
Apache Kafka是一个开源分布式事件流平台,用于高性能数据管道、流式分析、数据集成和关键任务应用程序。它提供了诸如流数据管道、实时分析、数据集成、事件源和日志聚合等多种能力。

Kafka的用例和能力

  • 流数据管道: Kafka提供了一个分布式发布-订阅消息系统,可以在系统或应用程序之间流式传输数据。它提供了具有数据复制和容错能力的强大队列。

  • 实时分析:Kafka允许使用工具如Kafka StreamsKSQL处理实时数据流,用于构建流式分析和数据处理应用程序。

  • 数据集成 :Kafka可以用来通过在不同数据源和格式之间流式传输数据来集成不同的系统。这使它对流式ETL非常有用。

  • 事件源 : Kafka提供了可以重放的事件时间日志,用于重构应用程序状态,适用于事件源和CQRS模式。

  • 日志聚合 : Kafka通常用于将不同服务器和应用程序的日志聚合到一个中央存储库中。这允许统一访问日志数据。

为什么将Golang与Apache Kafka结合使用

Golang这一高效并发的编程语言与Apache Kafka这一分布式事件流平台结合起来,提供了一个在构建尖端现代应用程序方面表现出色的强大技术栈。这两种技术之间的协同作用源自几个关键优势:

  • 性能 : GolangApache Kafka都提供高性能。Golang快速、高效和轻量级。Kafka为速度而构建,具有高吞吐量和低延迟。它们一起可以处理苛刻的工作负载。

  • 可扩展性 : GolanggoroutinesKafka的分区允许应用程序水平扩展以处理大量数据。Kafka可以轻松扩展生产者和消费者。

  • 并发性 : Golang通过goroutineschannels提供了出色的并发编程能力。Kafka并发传递消息并支持并行性。

  • 可用性 : Kafka的分布式架构使其高度可用和容错。Golang应用可以利用这一点来构建弹性系统。

  • 互操作性 : Kafka有多种语言的客户端,允许Golang应用与多语言环境互动。Kafka还使用二进制TCP协议以提高效率。

  • 现代设计 : KafkaGolang都采用现代设计理念,使它们非常适合云原生和微服务架构。

  • 开发人员体验 : Kafka的客户端库结合Goroutines、channels和接口,使其易于使用。

Kafka和Golang将性能、可扩展性和并发与生产力结合在一起 - 使它们成为构建可扩展的服务、管道和流应用程序的绝佳选择。

开始使用Apache Kafka

在开始使用GolangApache Kafka之前,我们必须确保golangKafka已经安装并在我们的机器上运行。

安装Kafka
28.windows安装kafka,Go操作kafka示例(sarama库)

Apache Kafka的Golang包
您可以使用go get安装confluent-kafka-go包:

go get -u github.com/confluentinc/confluent-kafka-go/kafka

安装后,您可以在Go代码中导入并使用confluent-kafka-go

package mainimport ("fmt""github.com/confluentinc/confluent-kafka-go/kafka"
)func main() {p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})if err != nil {fmt.Printf("创建生产者失败: %s\n", err)return}// 生产消息到主题,处理交付报告等。// 使用后记得关闭生产者defer p.Close()
}

构建生产者

Kafka生产者是Apache Kafka生态系统中的一个关键组成部分,作为一个客户端应用程序,负责向Kafka集群发布(写入)事件。这一部分提供了关于Kafka生产者的全面概述,以及针对调整其行为的配置设置的初步探讨。

下面是一个Golang应用程序的示例,它生产数据并将其发布到Kafka的具体topic。它还说明了如何在Golang中为Kafka消息序列化数据,并演示了如何处理错误和重试。

package mainimport ("fmt""github.com/confluentinc/confluent-kafka-go/kafka"
)const (kafkaBroker = "localhost:9092"topic       = "test-topic"
)type Messagestruct {Key   string `json:"key"`Value string `json:"value"`
}func main() {// 创建一个新的Kafka生产者p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": kafkaBroker})if err != nil {fmt.Printf("创建生产者失败: %s\n", err)return}defer p.Close()// 定义要发送的消息message := Message{Key:   "example_key",Value: "Hello, Kafka!",}// 序列化消息serializedMessage, err := serializeMessage(message)if err != nil {fmt.Printf("消息序列化失败: %s\n", err)return}// 将消息生产到Kafka主题err = produceMessage(p, topic, serializedMessage)if err != nil {fmt.Printf("消息生产失败: %s\n", err)return}fmt.Println("消息成功生产!")
}func serializeMessage(message Message) ([]byte, error) {// 将消息结构体序列化为JSONserialized, err := json.Marshal(message)if err != nil {return nil, fmt.Errorf("消息序列化失败: %w", err)}return serialized, nil
}func produceMessage(p *kafka.Producer, topic string, message []byte) error {// 创建一个新的要生产的Kafka消息kafkaMessage := &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},Value:          message,}// 生产Kafka消息deliveryChan := make(chan kafka.Event)err := p.Produce(kafkaMessage, deliveryChan)if err != nil {return fmt.Errorf("消息生产失败: %w", err)}// 等待交付报告或错误e := <-deliveryChanm := e.(*kafka.Message)// 检查交付错误,即生成者方确保发送到Broker的消息不丢失// 但可能重复发送,如实际发成功了,但是Broker的ACK返回给生产者时出现了网络Error// 从而重试后,导致消息重复发送,这时候需要下游做好幂等处理if m.TopicPartition.Error != nil {return fmt.Errorf("交付失败: %s", m.TopicPartition.Error)}// 关闭交付频道close(deliveryChan)return nil
}

步骤解释:

  1. 创建一个Kafka生产者。

  2. 使用json.Marshal函数将自定义消息结构体(Message)序列化为JSON

  3. 使用生产者将序列化的消息生产到Kafka topic

  4. 使用交付报告和错误检查处理错误和重试。

确保将localhost:9092替换为您的Kafka代理地址,将test-topic替换为所需的主题名称。此外,您可能需要处理更复杂的错误场景并根据您的具体需求实现重试逻辑

构建消费者

Kafka消费者就像小型事件处理器,它们获取并消化数据流。它们订阅主题并消费任何新到达的消息,处理每一个消息。我们将探讨这些消费者的内部工作原理和调整其性能的配置。

下面是一个Golang应用程序的示例,它从Kafka主题消费消息。如下代码包括了如何处理消费到的消息的说明,以及对不同消费模式(如单个消费者和消费者组)的讨论。

package mainimport ("fmt""os""os/signal""github.com/confluentinc/confluent-kafka-go/kafka"
)const (kafkaBroker = "localhost:9092"topic       = "test-topic"groupID     = "test-group"
)func main() {// 创建一个新的Kafka消费者c, err := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers":  kafkaBroker,"group.id":           groupID, // 消费者组标识"auto.offset.reset":  "earliest", // 从头开始消费})if err != nil {fmt.Printf("创建消费者失败: %s\n", err)return}defer c.Close()// 订阅Kafka主题err = c.SubscribeTopics([]string{topic}, nil)if err != nil {fmt.Printf("订阅主题失败: %s\n", err)return}// 设置一个通道来处理操作系统信号,以便优雅地关闭sigchan := make(chan os.Signal, 1)signal.Notify(sigchan, os.Interrupt)// 开始消费消息run := truefor run == true {select {case sig := <-sigchan:fmt.Printf("接收到信号 %v: 正在终止\n", sig)run = falsedefault:// 轮询Kafka消息,1次最多拉取100条消息ev := c.Poll(100) if ev == nil {continue}switch e := ev.(type) {case *kafka.Message:// 处理消费的消息fmt.Printf("从主题 %s 收到消息: %s\n", *e.TopicPartition.Topic, string(e.Value))case kafka.Error:// 处理Kafka错误fmt.Printf("错误: %v\n", e)}}}
}

步骤解释

  1. 创建一个Kafka消费者。

  2. 订阅一个Kafka主题。

  3. 设置一个通道来处理操作系统信号(如SIGINT)以优雅地关闭。

  4. 开始从订阅的Topic消费消息。

  5. 处理消费的消息以及Kafka错误。

不同的消费模式:

  • 单个消费者:在这种模式下,单个消费者实例从主题的一个或多个分区读取消息。当您只需要一个消费者应用程序实例来处理来自Topic的所有消息时,这很有用。

  • 消费者组:消费者组允许您通过将消息处理分布到多个消费者实例来扩展消费,以实现扩展。每个消费者组可以有多个消费者,组内的每个消费者从一部分分区读取消息。这使得消息的并行处理成为可能,提供了容错能力和高吞吐量。

在提供的示例中,group.id配置设置用于指定消费者组ID。这允许消费者应用程序的多个实例在消费者组中一起工作,从Kafka Topic消费消息。

总结

总之,Apache Kafka作为构建实时数据管道和流应用程序的强大解决方案,得益于其分布式、可扩展和容错的架构。当与Golang结合时,它形成了一个在性能、可扩展性和并发方面表现出色的强大技术栈,非常适合现代应用程序。通过利用Kafka的功能和Golang的优势,开发人员可以构建出具有弹性和高性能的服务、管道和流应用程序,这些应用程序可以轻松扩展以满足当今数据驱动世界的需求。无论是处理实时分析、集成不同的系统还是聚合日志,KafkaGolang提供了一个强势组合,使开发人员能够轻松构建创新和可扩展的解决方案。

相关文章:

142. Go操作Kafka(confluent-kafka-go库)

文章目录 Apache kafka简介开始使用Apache Kafka构建生产者构建消费者 总结 之前已经有两篇文章介绍过 Go如何操作 kafka 28.windows安装kafka&#xff0c;Go操作kafka示例&#xff08;sarama库&#xff09; 51.Go操作kafka示例&#xff08;kafka-go库&#xff09; Apache ka…...

spring boot(学习笔记第十九课)

spring boot(学习笔记第十九课) Spring boot的batch框架&#xff0c;以及Swagger3(OpenAPI)整合 学习内容&#xff1a; Spring boot的batch框架Spring boot的Swagger3&#xff08;OpenAPI&#xff09;整合 1. Spring boot batch框架 Spring Batch是什么 Spring Batch 是一个…...

docker安装 redis 并且加密开启SSL/TLS通道

拉取镜像 docker pull registry.cn-hangzhou.aliyuncs.com/qiluo-images/redis:latest docker tag registry.cn-hangzhou.aliyuncs.com/qiluo-images/redis:latest redis:latest要在 Docker 容器中启动 Redis 并开启 SSL/TLS 加密&#xff0c;需按照以下步骤修改启动命令和配置…...

什么是ARM架构?什么是X86架构?两者的区别是什么?

一、什么是ARM架构 &#xff08;一&#xff09;起源于发展 ARM 架构由英国剑桥的 Acorn 计算机公司开发。因市场无合适产品&#xff0c;Acorn 自行设计出第一款微处理器&#xff0c;命名为 ARM。此后 ARM 架构不断发展&#xff0c;1990 年为与苹果合作成立 ARM 公司&#xff0…...

【vscode】vscode paste image插件设置

本文首发于 ❄️慕雪的寒舍 vscode编辑md文件的时候&#xff0c;如果想插入图片&#xff0c;自带的粘贴只会粘贴到当前目录下&#xff0c;也没有文件重命名&#xff0c;很不友好。 在扩展商店里面有mushan的Paste Image插件&#xff0c;相比自带的&#xff0c;更加友好一点。但…...

自定义string类

#include <iostream> #include <string> int main() { std::string str "Hello, World!"; // 使用 c_str() 将 std::string 转换为 C 风格字符串&#xff0c;并传递给 printf printf("The string is: %s\n", str.c_str()); // 尝试修改…...

Python | Leetcode Python题解之第387题字符串中的第一个唯一字符

题目&#xff1a; 题解&#xff1a; class Solution:def firstUniqChar(self, s: str) -> int:position dict()q collections.deque()n len(s)for i, ch in enumerate(s):if ch not in position:position[ch] iq.append((s[i], i))else:position[ch] -1while q and po…...

RocketMQ 消费时序列化报错问题分析及解决

问题背景 在2024年3月7日&#xff0c;系统消费 RocketMQ 消息时出现了序列化报错&#xff0c;错误信息显示为&#xff1a; java.io.InvalidClassException: com.xxx.xxx.bean.mg.GoodsChangeLogMessage; local class incompatible: stream classdesc serialVersionUID... 这是…...

全能与专精:探索未来AI模型的发展趋势与市场潜力

文章目录 每日一句正能量前言AI模型的全面评估和比较AI模型的专精化和可扩展性AI模型的合理使用和道德规范后记 每日一句正能量 一个人&#xff0c;如果没有经受过投资失败的痛楚&#xff0c;又怎么会看到绝望之后的海阔天空。很多时候&#xff0c;经历了人生中最艰难的事&…...

Python深度学习:【开源数据集系列】ImageNet数据集

ImageNet 是一个大规模的视觉数据集,是计算机视觉领域最重要的基准数据集之一。该数据集由普林斯顿大学和斯坦福大学的研究人员发起,于 2009 年推出。ImageNet 是用于物体分类、目标检测、图像分割、姿势估计等多种任务的通用数据集,尤其在深度学习和计算机视觉的突破性研究…...

微信小程序手写签名

微信小程序手写签名组件 该组件基于signature_pad封装&#xff0c;signature_pad本身是web端的插件&#xff0c;此处将插件代码修改为小程序端可用。 signature_pad.js /*!* Signature Pad v5.0.3 | https://github.com/szimek/signature_pad* (c) 2024 Szymon Nowak | Releas…...

Javascript 使用中点查找矩形的角(Find Corners of Rectangle using mid points)

考虑一个矩形 ABCD&#xff0c;我们给出了边 AD 和 BC 中点&#xff08;分别为 p 和 q&#xff09;的坐标以及它们的长度 L&#xff08;AD BC L&#xff09;。现在给定参数&#xff0c;我们需要打印 4 个点 A、B、C 和 D 的坐标。 例子&#xff1a; 输入&#xff1a;p (1,…...

【困难】 猿人学web第一届 第18题 jsvmp 洞察先机

文章目录 数据接口分析还原加密参数插桩调试分析日志插桩补充 python 代码 数据接口分析 数据接口 https://match.yuanrenxue.cn/match/18data 请求参数 {page: 页码, t: 时间戳, v: 加密值} 请求第一页不需要携带 t, v 参数 cookie 只需要携带 sessionid 只要 还原加密字段…...

IDEA Maven 源修改为国内阿里云镜像的正确方式

&#x1f49d;&#x1f49d;&#x1f49d;欢迎莅临我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐&#xff1a;「storm…...

OpenCV 旋转矩形边界

边界矩形是用最小面积绘制的&#xff0c;所以它也考虑了旋转。使用的函数是**cv.minAreaRect**()。 import cv2 import numpy as npimgcv2.imread(rD:\PythonProject\thunder.jpg) img1cv2.cvtColor(img,cv2.COLOR_BGR2GRAY) print(img.dtype) ret,threshcv2.threshold(img1,1…...

人车防撞系统安全生产方案

根据《市场监管总局关于2021~2023年全国特种设备安全状况的通告》数据显示&#xff1a;2023年&#xff1a;全国共发生特种设备事故和相关事故71起&#xff0c;其中死亡69人。包含叉车在内的场(厂)内专用机动车辆事故29起、死亡28人&#xff0c;占事故总数的40.85%、死亡人数的4…...

开放式耳机哪个牌子好?长文传授6招秘籍,彻底远离坑货!

​大家好&#xff0c;作为一位专注于评测各类数码产品的博主&#xff0c;今天我特别推荐开放式耳机作为我们日常的首选。这种耳机以其独特的设计&#xff0c;避免了传统耳机长时间佩戴可能带来的不适和感染风险。开放式耳机佩戴简便且稳固&#xff0c;尤其适合热爱跑步和运动的…...

vue2和vue3双向绑定的原理

Vue.js 的双向绑定是 Vue 框架的核心特性之一&#xff0c;它允许数据和视图之间保持同步。虽然 Vue 2 和 Vue 3 都实现了双向绑定&#xff0c;但它们在实现细节上有所不同。 Vue 2 双向绑定的原理 在 Vue 2 中&#xff0c;双向绑定主要依赖于 Object.defineProperty 和观察者…...

别为大文件烦恼!mp4文件太大怎么变小?3个管用方法

你是否曾经遇到过mp4视频文件过大的困扰&#xff1f;每当想要分享或存储mp4文件时&#xff0c;巨大的文件就成了阻碍。明明感觉感觉没占用多少空间&#xff0c;但是设备却常常出现空间过满警告。 没多少空间的设备真是让人大为恼火&#xff0c;没人想多花一份钱买设备。那么只…...

cocotb的接收和发送逻辑,还是没有弄明白

发送有两种方式 1、定义这样的发 通过前缀连接DUT里面的信号 发送的时候&#xff0c;通过.去访问就可以 2、如果是AXI总线&#xff0c;可以直接调用cocotb的库文件 AXIS总线可以包含以下的信号 通过这个类&#xff0c;可以产生一个AXIS的一帧数据 类的实现大概如下 然后也…...

IDEA运行Tomcat出现乱码问题解决汇总

最近正值期末周&#xff0c;有很多同学在写期末Java web作业时&#xff0c;运行tomcat出现乱码问题&#xff0c;经过多次解决与研究&#xff0c;我做了如下整理&#xff1a; 原因&#xff1a; IDEA本身编码与tomcat的编码与Windows编码不同导致&#xff0c;Windows 系统控制台…...

[特殊字符] 智能合约中的数据是如何在区块链中保持一致的?

&#x1f9e0; 智能合约中的数据是如何在区块链中保持一致的&#xff1f; 为什么所有区块链节点都能得出相同结果&#xff1f;合约调用这么复杂&#xff0c;状态真能保持一致吗&#xff1f;本篇带你从底层视角理解“状态一致性”的真相。 一、智能合约的数据存储在哪里&#xf…...

Chapter03-Authentication vulnerabilities

文章目录 1. 身份验证简介1.1 What is authentication1.2 difference between authentication and authorization1.3 身份验证机制失效的原因1.4 身份验证机制失效的影响 2. 基于登录功能的漏洞2.1 密码爆破2.2 用户名枚举2.3 有缺陷的暴力破解防护2.3.1 如果用户登录尝试失败次…...

无法与IP建立连接,未能下载VSCode服务器

如题&#xff0c;在远程连接服务器的时候突然遇到了这个提示。 查阅了一圈&#xff0c;发现是VSCode版本自动更新惹的祸&#xff01;&#xff01;&#xff01; 在VSCode的帮助->关于这里发现前几天VSCode自动更新了&#xff0c;我的版本号变成了1.100.3 才导致了远程连接出…...

MMaDA: Multimodal Large Diffusion Language Models

CODE &#xff1a; https://github.com/Gen-Verse/MMaDA Abstract 我们介绍了一种新型的多模态扩散基础模型MMaDA&#xff0c;它被设计用于在文本推理、多模态理解和文本到图像生成等不同领域实现卓越的性能。该方法的特点是三个关键创新:(i) MMaDA采用统一的扩散架构&#xf…...

[ICLR 2022]How Much Can CLIP Benefit Vision-and-Language Tasks?

论文网址&#xff1a;pdf 英文是纯手打的&#xff01;论文原文的summarizing and paraphrasing。可能会出现难以避免的拼写错误和语法错误&#xff0c;若有发现欢迎评论指正&#xff01;文章偏向于笔记&#xff0c;谨慎食用 目录 1. 心得 2. 论文逐段精读 2.1. Abstract 2…...

苍穹外卖--缓存菜品

1.问题说明 用户端小程序展示的菜品数据都是通过查询数据库获得&#xff0c;如果用户端访问量比较大&#xff0c;数据库访问压力随之增大 2.实现思路 通过Redis来缓存菜品数据&#xff0c;减少数据库查询操作。 缓存逻辑分析&#xff1a; ①每个分类下的菜品保持一份缓存数据…...

BCS 2025|百度副总裁陈洋:智能体在安全领域的应用实践

6月5日&#xff0c;2025全球数字经济大会数字安全主论坛暨北京网络安全大会在国家会议中心隆重开幕。百度副总裁陈洋受邀出席&#xff0c;并作《智能体在安全领域的应用实践》主题演讲&#xff0c;分享了在智能体在安全领域的突破性实践。他指出&#xff0c;百度通过将安全能力…...

css3笔记 (1) 自用

outline: none 用于移除元素获得焦点时默认的轮廓线 broder:0 用于移除边框 font-size&#xff1a;0 用于设置字体不显示 list-style: none 消除<li> 标签默认样式 margin: xx auto 版心居中 width:100% 通栏 vertical-align 作用于行内元素 / 表格单元格&#xff…...

优选算法第十二讲:队列 + 宽搜 优先级队列

优选算法第十二讲&#xff1a;队列 宽搜 && 优先级队列 1.N叉树的层序遍历2.二叉树的锯齿型层序遍历3.二叉树最大宽度4.在每个树行中找最大值5.优先级队列 -- 最后一块石头的重量6.数据流中的第K大元素7.前K个高频单词8.数据流的中位数 1.N叉树的层序遍历 2.二叉树的锯…...