142. Go操作Kafka(confluent-kafka-go库)
文章目录
- Apache kafka简介
- 开始使用Apache Kafka
- 构建生产者
- 构建消费者
- 总结
之前已经有两篇文章介绍过
Go
如何操作
kafka
28.windows安装kafka,Go操作kafka示例(sarama库)
51.Go操作kafka示例(kafka-go库)
Apache kafka简介
Apache Kafka
是一个开源分布式事件流平台,用于高性能数据管道、流式分析、数据集成和关键任务应用程序。它提供了诸如流数据管道、实时分析、数据集成、事件源和日志聚合等多种能力。
Kafka的用例和能力
-
流数据管道:
Kafka
提供了一个分布式发布-订阅消息系统,可以在系统或应用程序之间流式传输数据。它提供了具有数据复制和容错能力的强大队列。 -
实时分析:
Kafka
允许使用工具如Kafka Streams
和KSQL
处理实时数据流,用于构建流式分析和数据处理应用程序。 -
数据集成 :
Kafka
可以用来通过在不同数据源和格式之间流式传输数据来集成不同的系统。这使它对流式ETL
非常有用。 -
事件源 :
Kafka
提供了可以重放的事件时间日志,用于重构应用程序状态,适用于事件源和CQRS
模式。 -
日志聚合 :
Kafka
通常用于将不同服务器和应用程序的日志聚合到一个中央存储库中。这允许统一访问日志数据。
为什么将Golang与Apache Kafka结合使用
将Golang
这一高效并发的编程语言与Apache Kafka
这一分布式事件流平台结合起来,提供了一个在构建尖端现代应用程序方面表现出色的强大技术栈。这两种技术之间的协同作用源自几个关键优势:
-
性能 :
Golang
和Apache Kafka
都提供高性能。Golang
快速、高效和轻量级。Kafka
为速度而构建,具有高吞吐量和低延迟。它们一起可以处理苛刻的工作负载。 -
可扩展性 :
Golang
的goroutines
和Kafka
的分区允许应用程序水平扩展以处理大量数据。Kafka
可以轻松扩展生产者和消费者。 -
并发性 :
Golang
通过goroutines
和channels
提供了出色的并发编程能力。Kafka
并发传递消息并支持并行性。 -
可用性 :
Kafka
的分布式架构使其高度可用和容错。Golang
应用可以利用这一点来构建弹性系统。 -
互操作性 :
Kafka
有多种语言的客户端,允许Golang
应用与多语言环境互动。Kafka
还使用二进制TCP
协议以提高效率。 -
现代设计 :
Kafka
和Golang
都采用现代设计理念,使它们非常适合云原生和微服务架构。 -
开发人员体验 :
Kafka
的客户端库结合Goroutines、channels
和接口,使其易于使用。
Kafka和Golang将性能、可扩展性和并发与生产力结合在一起 - 使它们成为构建可扩展的服务、管道和流应用程序的绝佳选择。
开始使用Apache Kafka
在开始使用Golang
和Apache Kafka
之前,我们必须确保golang
和Kafka
已经安装并在我们的机器上运行。
安装Kafka
28.windows安装kafka,Go操作kafka示例(sarama库)
Apache Kafka的Golang包
您可以使用go get
安装confluent-kafka-go
包:
go get -u github.com/confluentinc/confluent-kafka-go/kafka
安装后,您可以在Go
代码中导入并使用confluent-kafka-go
。
package mainimport ("fmt""github.com/confluentinc/confluent-kafka-go/kafka"
)func main() {p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})if err != nil {fmt.Printf("创建生产者失败: %s\n", err)return}// 生产消息到主题,处理交付报告等。// 使用后记得关闭生产者defer p.Close()
}
构建生产者
Kafka
生产者是Apache Kafka
生态系统中的一个关键组成部分,作为一个客户端应用程序,负责向Kafka
集群发布(写入)事件。这一部分提供了关于Kafka
生产者的全面概述,以及针对调整其行为的配置设置的初步探讨。
下面是一个Golang
应用程序的示例,它生产数据并将其发布到Kafka
的具体topic
。它还说明了如何在Golang
中为Kafka
消息序列化数据,并演示了如何处理错误和重试。
package mainimport ("fmt""github.com/confluentinc/confluent-kafka-go/kafka"
)const (kafkaBroker = "localhost:9092"topic = "test-topic"
)type Messagestruct {Key string `json:"key"`Value string `json:"value"`
}func main() {// 创建一个新的Kafka生产者p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": kafkaBroker})if err != nil {fmt.Printf("创建生产者失败: %s\n", err)return}defer p.Close()// 定义要发送的消息message := Message{Key: "example_key",Value: "Hello, Kafka!",}// 序列化消息serializedMessage, err := serializeMessage(message)if err != nil {fmt.Printf("消息序列化失败: %s\n", err)return}// 将消息生产到Kafka主题err = produceMessage(p, topic, serializedMessage)if err != nil {fmt.Printf("消息生产失败: %s\n", err)return}fmt.Println("消息成功生产!")
}func serializeMessage(message Message) ([]byte, error) {// 将消息结构体序列化为JSONserialized, err := json.Marshal(message)if err != nil {return nil, fmt.Errorf("消息序列化失败: %w", err)}return serialized, nil
}func produceMessage(p *kafka.Producer, topic string, message []byte) error {// 创建一个新的要生产的Kafka消息kafkaMessage := &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},Value: message,}// 生产Kafka消息deliveryChan := make(chan kafka.Event)err := p.Produce(kafkaMessage, deliveryChan)if err != nil {return fmt.Errorf("消息生产失败: %w", err)}// 等待交付报告或错误e := <-deliveryChanm := e.(*kafka.Message)// 检查交付错误,即生成者方确保发送到Broker的消息不丢失// 但可能重复发送,如实际发成功了,但是Broker的ACK返回给生产者时出现了网络Error// 从而重试后,导致消息重复发送,这时候需要下游做好幂等处理if m.TopicPartition.Error != nil {return fmt.Errorf("交付失败: %s", m.TopicPartition.Error)}// 关闭交付频道close(deliveryChan)return nil
}
步骤解释:
-
创建一个
Kafka
生产者。 -
使用
json.Marshal
函数将自定义消息结构体(Message
)序列化为JSON
。 -
使用生产者将序列化的消息生产到
Kafka topic
。 -
使用交付报告和错误检查处理错误和重试。
确保将localhost:9092
替换为您的Kafka
代理地址,将test-topic
替换为所需的主题名称。此外,您可能需要处理更复杂的错误场景并根据您的具体需求实现重试逻辑
。
构建消费者
Kafka
消费者就像小型事件处理器,它们获取并消化数据流。它们订阅主题并消费任何新到达的消息,处理每一个消息。我们将探讨这些消费者的内部工作原理和调整其性能的配置。
下面是一个Golang
应用程序的示例,它从Kafka
主题消费消息。如下代码包括了如何处理消费到的消息的说明,以及对不同消费模式(如单个消费者和消费者组)的讨论。
package mainimport ("fmt""os""os/signal""github.com/confluentinc/confluent-kafka-go/kafka"
)const (kafkaBroker = "localhost:9092"topic = "test-topic"groupID = "test-group"
)func main() {// 创建一个新的Kafka消费者c, err := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers": kafkaBroker,"group.id": groupID, // 消费者组标识"auto.offset.reset": "earliest", // 从头开始消费})if err != nil {fmt.Printf("创建消费者失败: %s\n", err)return}defer c.Close()// 订阅Kafka主题err = c.SubscribeTopics([]string{topic}, nil)if err != nil {fmt.Printf("订阅主题失败: %s\n", err)return}// 设置一个通道来处理操作系统信号,以便优雅地关闭sigchan := make(chan os.Signal, 1)signal.Notify(sigchan, os.Interrupt)// 开始消费消息run := truefor run == true {select {case sig := <-sigchan:fmt.Printf("接收到信号 %v: 正在终止\n", sig)run = falsedefault:// 轮询Kafka消息,1次最多拉取100条消息ev := c.Poll(100) if ev == nil {continue}switch e := ev.(type) {case *kafka.Message:// 处理消费的消息fmt.Printf("从主题 %s 收到消息: %s\n", *e.TopicPartition.Topic, string(e.Value))case kafka.Error:// 处理Kafka错误fmt.Printf("错误: %v\n", e)}}}
}
步骤解释:
-
创建一个
Kafka
消费者。 -
订阅一个
Kafka
主题。 -
设置一个通道来处理操作系统信号(如
SIGINT
)以优雅地关闭。 -
开始从订阅的
Topic
消费消息。 -
处理消费的消息以及
Kafka
错误。
不同的消费模式:
-
单个消费者:在这种模式下,单个消费者实例从主题的一个或多个分区读取消息。当您只需要一个消费者应用程序实例来处理来自
Topic
的所有消息时,这很有用。 -
消费者组:消费者组允许您通过将消息处理分布到多个消费者实例来扩展消费,以实现扩展。每个消费者组可以有多个消费者,组内的每个消费者从一部分分区读取消息。这使得消息的并行处理成为可能,提供了容错能力和高吞吐量。
在提供的示例中,group.id
配置设置用于指定消费者组ID
。这允许消费者应用程序的多个实例在消费者组中一起工作,从Kafka Topic
消费消息。
总结
总之,Apache Kafka
作为构建实时数据管道和流应用程序的强大解决方案,得益于其分布式、可扩展和容错的架构。当与Golang
结合时,它形成了一个在性能、可扩展性和并发方面表现出色的强大技术栈,非常适合现代应用程序。通过利用Kafka
的功能和Golang
的优势,开发人员可以构建出具有弹性和高性能的服务、管道和流应用程序,这些应用程序可以轻松扩展以满足当今数据驱动世界的需求。无论是处理实时分析、集成不同的系统还是聚合日志,Kafka
和Golang
提供了一个强势组合,使开发人员能够轻松构建创新和可扩展的解决方案。
相关文章:

142. Go操作Kafka(confluent-kafka-go库)
文章目录 Apache kafka简介开始使用Apache Kafka构建生产者构建消费者 总结 之前已经有两篇文章介绍过 Go如何操作 kafka 28.windows安装kafka,Go操作kafka示例(sarama库) 51.Go操作kafka示例(kafka-go库) Apache ka…...

spring boot(学习笔记第十九课)
spring boot(学习笔记第十九课) Spring boot的batch框架,以及Swagger3(OpenAPI)整合 学习内容: Spring boot的batch框架Spring boot的Swagger3(OpenAPI)整合 1. Spring boot batch框架 Spring Batch是什么 Spring Batch 是一个…...
docker安装 redis 并且加密开启SSL/TLS通道
拉取镜像 docker pull registry.cn-hangzhou.aliyuncs.com/qiluo-images/redis:latest docker tag registry.cn-hangzhou.aliyuncs.com/qiluo-images/redis:latest redis:latest要在 Docker 容器中启动 Redis 并开启 SSL/TLS 加密,需按照以下步骤修改启动命令和配置…...

什么是ARM架构?什么是X86架构?两者的区别是什么?
一、什么是ARM架构 (一)起源于发展 ARM 架构由英国剑桥的 Acorn 计算机公司开发。因市场无合适产品,Acorn 自行设计出第一款微处理器,命名为 ARM。此后 ARM 架构不断发展,1990 年为与苹果合作成立 ARM 公司࿰…...

【vscode】vscode paste image插件设置
本文首发于 ❄️慕雪的寒舍 vscode编辑md文件的时候,如果想插入图片,自带的粘贴只会粘贴到当前目录下,也没有文件重命名,很不友好。 在扩展商店里面有mushan的Paste Image插件,相比自带的,更加友好一点。但…...

自定义string类
#include <iostream> #include <string> int main() { std::string str "Hello, World!"; // 使用 c_str() 将 std::string 转换为 C 风格字符串,并传递给 printf printf("The string is: %s\n", str.c_str()); // 尝试修改…...

Python | Leetcode Python题解之第387题字符串中的第一个唯一字符
题目: 题解: class Solution:def firstUniqChar(self, s: str) -> int:position dict()q collections.deque()n len(s)for i, ch in enumerate(s):if ch not in position:position[ch] iq.append((s[i], i))else:position[ch] -1while q and po…...
RocketMQ 消费时序列化报错问题分析及解决
问题背景 在2024年3月7日,系统消费 RocketMQ 消息时出现了序列化报错,错误信息显示为: java.io.InvalidClassException: com.xxx.xxx.bean.mg.GoodsChangeLogMessage; local class incompatible: stream classdesc serialVersionUID... 这是…...

全能与专精:探索未来AI模型的发展趋势与市场潜力
文章目录 每日一句正能量前言AI模型的全面评估和比较AI模型的专精化和可扩展性AI模型的合理使用和道德规范后记 每日一句正能量 一个人,如果没有经受过投资失败的痛楚,又怎么会看到绝望之后的海阔天空。很多时候,经历了人生中最艰难的事&…...
Python深度学习:【开源数据集系列】ImageNet数据集
ImageNet 是一个大规模的视觉数据集,是计算机视觉领域最重要的基准数据集之一。该数据集由普林斯顿大学和斯坦福大学的研究人员发起,于 2009 年推出。ImageNet 是用于物体分类、目标检测、图像分割、姿势估计等多种任务的通用数据集,尤其在深度学习和计算机视觉的突破性研究…...

微信小程序手写签名
微信小程序手写签名组件 该组件基于signature_pad封装,signature_pad本身是web端的插件,此处将插件代码修改为小程序端可用。 signature_pad.js /*!* Signature Pad v5.0.3 | https://github.com/szimek/signature_pad* (c) 2024 Szymon Nowak | Releas…...

Javascript 使用中点查找矩形的角(Find Corners of Rectangle using mid points)
考虑一个矩形 ABCD,我们给出了边 AD 和 BC 中点(分别为 p 和 q)的坐标以及它们的长度 L(AD BC L)。现在给定参数,我们需要打印 4 个点 A、B、C 和 D 的坐标。 例子: 输入:p (1,…...

【困难】 猿人学web第一届 第18题 jsvmp 洞察先机
文章目录 数据接口分析还原加密参数插桩调试分析日志插桩补充 python 代码 数据接口分析 数据接口 https://match.yuanrenxue.cn/match/18data 请求参数 {page: 页码, t: 时间戳, v: 加密值} 请求第一页不需要携带 t, v 参数 cookie 只需要携带 sessionid 只要 还原加密字段…...

IDEA Maven 源修改为国内阿里云镜像的正确方式
💝💝💝欢迎莅临我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:「storm…...

OpenCV 旋转矩形边界
边界矩形是用最小面积绘制的,所以它也考虑了旋转。使用的函数是**cv.minAreaRect**()。 import cv2 import numpy as npimgcv2.imread(rD:\PythonProject\thunder.jpg) img1cv2.cvtColor(img,cv2.COLOR_BGR2GRAY) print(img.dtype) ret,threshcv2.threshold(img1,1…...

人车防撞系统安全生产方案
根据《市场监管总局关于2021~2023年全国特种设备安全状况的通告》数据显示:2023年:全国共发生特种设备事故和相关事故71起,其中死亡69人。包含叉车在内的场(厂)内专用机动车辆事故29起、死亡28人,占事故总数的40.85%、死亡人数的4…...

开放式耳机哪个牌子好?长文传授6招秘籍,彻底远离坑货!
大家好,作为一位专注于评测各类数码产品的博主,今天我特别推荐开放式耳机作为我们日常的首选。这种耳机以其独特的设计,避免了传统耳机长时间佩戴可能带来的不适和感染风险。开放式耳机佩戴简便且稳固,尤其适合热爱跑步和运动的…...
vue2和vue3双向绑定的原理
Vue.js 的双向绑定是 Vue 框架的核心特性之一,它允许数据和视图之间保持同步。虽然 Vue 2 和 Vue 3 都实现了双向绑定,但它们在实现细节上有所不同。 Vue 2 双向绑定的原理 在 Vue 2 中,双向绑定主要依赖于 Object.defineProperty 和观察者…...

别为大文件烦恼!mp4文件太大怎么变小?3个管用方法
你是否曾经遇到过mp4视频文件过大的困扰?每当想要分享或存储mp4文件时,巨大的文件就成了阻碍。明明感觉感觉没占用多少空间,但是设备却常常出现空间过满警告。 没多少空间的设备真是让人大为恼火,没人想多花一份钱买设备。那么只…...

cocotb的接收和发送逻辑,还是没有弄明白
发送有两种方式 1、定义这样的发 通过前缀连接DUT里面的信号 发送的时候,通过.去访问就可以 2、如果是AXI总线,可以直接调用cocotb的库文件 AXIS总线可以包含以下的信号 通过这个类,可以产生一个AXIS的一帧数据 类的实现大概如下 然后也…...
web vue 项目 Docker化部署
Web 项目 Docker 化部署详细教程 目录 Web 项目 Docker 化部署概述Dockerfile 详解 构建阶段生产阶段 构建和运行 Docker 镜像 1. Web 项目 Docker 化部署概述 Docker 化部署的主要步骤分为以下几个阶段: 构建阶段(Build Stage):…...
前端倒计时误差!
提示:记录工作中遇到的需求及解决办法 文章目录 前言一、误差从何而来?二、五大解决方案1. 动态校准法(基础版)2. Web Worker 计时3. 服务器时间同步4. Performance API 高精度计时5. 页面可见性API优化三、生产环境最佳实践四、终极解决方案架构前言 前几天听说公司某个项…...

微信小程序 - 手机震动
一、界面 <button type"primary" bindtap"shortVibrate">短震动</button> <button type"primary" bindtap"longVibrate">长震动</button> 二、js逻辑代码 注:文档 https://developers.weixin.qq…...
OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别
OpenPrompt 和直接对提示词的嵌入向量进行训练有什么区别 直接训练提示词嵌入向量的核心区别 您提到的代码: prompt_embedding = initial_embedding.clone().requires_grad_(True) optimizer = torch.optim.Adam([prompt_embedding...
css3笔记 (1) 自用
outline: none 用于移除元素获得焦点时默认的轮廓线 broder:0 用于移除边框 font-size:0 用于设置字体不显示 list-style: none 消除<li> 标签默认样式 margin: xx auto 版心居中 width:100% 通栏 vertical-align 作用于行内元素 / 表格单元格ÿ…...

云原生玩法三问:构建自定义开发环境
云原生玩法三问:构建自定义开发环境 引言 临时运维一个古董项目,无文档,无环境,无交接人,俗称三无。 运行设备的环境老,本地环境版本高,ssh不过去。正好最近对 腾讯出品的云原生 cnb 感兴趣&…...

Web后端基础(基础知识)
BS架构:Browser/Server,浏览器/服务器架构模式。客户端只需要浏览器,应用程序的逻辑和数据都存储在服务端。 优点:维护方便缺点:体验一般 CS架构:Client/Server,客户端/服务器架构模式。需要单独…...
深入浅出Diffusion模型:从原理到实践的全方位教程
I. 引言:生成式AI的黎明 – Diffusion模型是什么? 近年来,生成式人工智能(Generative AI)领域取得了爆炸性的进展,模型能够根据简单的文本提示创作出逼真的图像、连贯的文本,乃至更多令人惊叹的…...

Cilium动手实验室: 精通之旅---13.Cilium LoadBalancer IPAM and L2 Service Announcement
Cilium动手实验室: 精通之旅---13.Cilium LoadBalancer IPAM and L2 Service Announcement 1. LAB环境2. L2公告策略2.1 部署Death Star2.2 访问服务2.3 部署L2公告策略2.4 服务宣告 3. 可视化 ARP 流量3.1 部署新服务3.2 准备可视化3.3 再次请求 4. 自动IPAM4.1 IPAM Pool4.2 …...
Python 高级应用10:在python 大型项目中 FastAPI 和 Django 的相互配合
无论是python,或者java 的大型项目中,都会涉及到 自身平台微服务之间的相互调用,以及和第三发平台的 接口对接,那在python 中是怎么实现的呢? 在 Python Web 开发中,FastAPI 和 Django 是两个重要但定位不…...