当前位置: 首页 > article >正文

【kafka】kafka概念,使用技巧go示例

1. Kafka基础概念

1.1 什么是Kafka?

Kafka是一个分布式流处理平台,用于构建实时数据管道和流式应用。核心特点:

  • 高吞吐量:每秒可处理百万级消息
  • 持久化存储:消息按Topic分区存储在磁盘
  • 分布式架构:支持水平扩展
  • 高可用性:通过副本机制保证数据不丢失
1.2 核心组件
  • Topic(主题):消息的逻辑分类,如user_loginorder_create
  • Partition(分区):Topic的物理分片,每个分区是有序的日志文件
  • Broker(代理):Kafka集群中的服务器节点
  • Producer(生产者):向Topic发送消息的应用
  • Consumer(消费者):从Topic接收消息的应用
  • Consumer Group(消费者组):多个消费者组成的组,共同消费Topic数据

2. Go语言操作Kafka

2.1 选择客户端库

Go语言中推荐使用confluent-kafka-go库,它基于librdkafka实现,性能优秀且功能完整:

go get -u github.com/confluentinc/confluent-kafka-go/kafka
2.2 生产者示例
package mainimport ("fmt""os""os/signal""syscall""github.com/confluentinc/confluent-kafka-go/kafka"
)func main() {// 配置生产者p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092",  // Kafka集群地址"acks":              "all",             // 所有副本确认"retries":           5,                 // 重试次数})if err != nil {panic(err)}defer p.Close()// 异步处理发送结果go func() {for e := range p.Events() {switch ev := e.(type) {case *kafka.Message:if ev.TopicPartition.Error != nil {fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)} else {fmt.Printf("Delivered message to %v\n", ev.TopicPartition)}}}}()// 发送消息topic := "user_login"for i := 0; i < 10; i++ {value := fmt.Sprintf("Hello Kafka %d", i)p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},Value:          []byte(value),}, nil)}// 等待所有消息发送完成p.Flush(15 * 1000)  // 超时15秒// 优雅退出sigchan := make(chan os.Signal, 1)signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)<-sigchan
}
2.3 消费者示例
package mainimport ("fmt""os""os/signal""syscall""github.com/confluentinc/confluent-kafka-go/kafka"
)func main() {// 配置消费者c, err := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092","group.id":          "my-group","auto.offset.reset": "earliest",  // 从最早的消息开始消费})if err != nil {panic(err)}defer c.Close()// 订阅主题topic := "user_login"c.SubscribeTopics([]string{topic}, nil)// 处理信号,优雅退出sigchan := make(chan os.Signal, 1)signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)run := truefor run {select {case sig := <-sigchan:fmt.Printf("Caught signal %v: terminating\n", sig)run = falsedefault:ev := c.Poll(100)  // 轮询100msif ev == nil {continue}switch e := ev.(type) {case *kafka.Message:fmt.Printf("Message on %s: %s\n",e.TopicPartition, string(e.Value))// 手动提交偏移量c.CommitMessage(e)case kafka.Error:fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)if e.Code() == kafka.ErrAllBrokersDown {run = false}default:// 忽略其他事件}}}fmt.Println("Closing consumer")
}

3. 高级特性与最佳实践

3.1 消息分区策略

Kafka通过分区实现并行处理,生产者可指定分区策略:

// 1. 轮询(默认):均匀分布消息到各分区
p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},Value:          []byte(value),
}, nil)// 2. 基于Key哈希:相同Key的消息发到同一分区
p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},Key:            []byte(userID),  // 根据用户ID哈希到固定分区Value:          []byte(value),
}, nil)
3.2 消费者组与分区分配
  • 同一消费者组内的消费者共同消费Topic的所有分区
  • 每个分区只能被组内一个消费者消费
  • 消费者数量超过分区数时,多余的消费者空闲
3.3 手动提交偏移量
// 配置手动提交
config := &kafka.ConfigMap{"bootstrap.servers": "localhost:9092","group.id":          "my-group","enable.auto.commit": false,  // 禁用自动提交
}// 消费消息后手动提交
for {msg, err := c.ReadMessage(-1)  // 阻塞读取if err == nil {fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))// 处理消息...// 手动提交当前消息的偏移量_, err := c.CommitMessage(msg)if err != nil {fmt.Printf("Failed to commit offset: %v\n", err)}}
}
3.4 事务处理
// 配置事务生产者
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092","transactional.id":  "my-transactional-id",
})
if err != nil {panic(err)
}// 初始化事务
p.InitTransactions(10 * time.Second)// 开始事务
p.BeginTransaction()// 发送多条消息
p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic1}, Value: []byte("msg1")}, nil)
p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic2}, Value: []byte("msg2")}, nil)// 提交事务
err = p.CommitTransaction(10 * time.Second)
if err != nil {p.AbortTransaction(10 * time.Second)  // 回滚
}

4. 企业级实战案例

4.1 异步日志处理
// 生产者:收集应用日志发送到Kafka
func LogToKafka(level, message string) {p, _ := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "kafka:9092"})defer p.Close()topic := "app_logs"msg := &kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},Key:            []byte(level),Value:          []byte(message),}p.Produce(msg, nil)p.Flush(2 * 1000)  // 等待2秒
}// 消费者:从Kafka读取日志并存储到Elasticsearch
func ConsumeAndIndex() {c, _ := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers": "kafka:9092","group.id":          "log-consumer-group",})c.SubscribeTopics([]string{"app_logs"}, nil)for {msg, err := c.ReadMessage(-1)if err == nil {// 发送到ElasticsearchsendToES(string(msg.Key), string(msg.Value))}}
}
4.2 微服务间事件驱动通信
// 订单服务:创建订单后发送事件
func CreateOrder(userID, productID string, amount float64) {// 1. 创建订单orderID := generateOrderID()saveOrderToDB(orderID, userID, productID, amount)// 2. 发送订单创建事件到Kafkap, _ := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "kafka:9092"})defer p.Close()topic := "order_created"event := fmt.Sprintf(`{"order_id": "%s", "user_id": "%s", "amount": %.2f}`, orderID, userID, amount)p.Produce(&kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},Value:          []byte(event),}, nil)
}// 库存服务:监听订单创建事件并扣减库存
func StartInventoryService() {c, _ := kafka.NewConsumer(&kafka.ConfigMap{"bootstrap.servers": "kafka:9092","group.id":          "inventory-service-group",})c.SubscribeTopics([]string{"order_created"}, nil)for {msg, err := c.ReadMessage(-1)if err == nil {// 解析订单事件var orderEvent struct {OrderID string  `json:"order_id"`UserID  string  `json:"user_id"`Amount  float64 `json:"amount"`}json.Unmarshal(msg.Value, &orderEvent)// 扣减库存deductInventory(orderEvent.ProductID, 1)}}
}

5. 性能优化与常见问题

5.1 生产者性能优化
  • 批量发送:设置batch.sizelinger.ms
  • 压缩消息:启用compression.type(如snappylz4
  • 异步发送:使用回调函数处理发送结果
5.2 消费者性能优化
  • 增加分区数:提高并行消费能力
  • 多消费者实例:通过消费者组水平扩展
  • 合理批量处理:批量拉取消息,批量提交偏移量
5.3 常见问题排查
问题原因解决方案
消息丢失acks配置不当、副本数不足设置acks=all,确保至少2个副本
消费滞后消费速度慢、分区数不足增加消费者、提高处理效率、增加分区数
重复消费偏移量提交时机不当处理完消息后再提交偏移量,或使用事务
生产者吞吐量低批处理参数不合理、网络延迟增大batch.sizelinger.ms,优化网络连接

6. 生产环境部署建议

  1. 多Broker集群:至少3个Broker,提高可用性
  2. 合理分区数:根据业务量预估,建议单个Topic分区数≥3
  3. 数据备份:定期备份Kafka日志
  4. 监控系统:集成Prometheus、Grafana监控Kafka性能
  5. 安全配置:启用SSL/TLS加密、SASL认证

总结:Go语言使用Kafka的最佳实践

  1. 生产者

    • 使用异步发送提高吞吐量
    • 合理配置acks和重试次数保证消息不丢失
    • 根据业务需求选择分区策略
  2. 消费者

    • 使用消费者组实现水平扩展
    • 手动提交偏移量确保消息处理可靠性
    • 处理消息失败时考虑重试或死信队列
  3. 性能与可靠性

    • 批量处理提高效率
    • 监控关键指标(如Lag、吞吐量)
    • 设计幂等消费逻辑应对重复消息

https://github.com/0voice

相关文章:

【kafka】kafka概念,使用技巧go示例

1. Kafka基础概念 1.1 什么是Kafka&#xff1f; Kafka是一个分布式流处理平台&#xff0c;用于构建实时数据管道和流式应用。核心特点&#xff1a; 高吞吐量&#xff1a;每秒可处理百万级消息持久化存储&#xff1a;消息按Topic分区存储在磁盘分布式架构&#xff1a;支持水平…...

利用散点图探索宇航员特征与太空任务之间的关系

利用散点图探索宇航员特征与太空任务之间的关系 import matplotlib.pyplot as plt import numpy as np import pandas as pdfrom flexitext import flexitext from matplotlib.patches import FancyArrowPatchplt.rcParams.update({"font.family": "Corbel&quo…...

Ubuntu 命令行显示中文输出信息

Ctrl Alt T 打开终端命令行, 输入命令: sudo apt-get install language-pack-zh-hans安装中文语言支持包 sudo apt-get install language-pack-zh-hans-base配置环境变量 sudo vim /etc/profile进入文件后&#xff0c;按下 a 进入编辑模式&#xff0c;shift ↓ \downarr…...

Linux文件编程——read函数与lseek函数

一、read函数 在 Linux 文件编程中&#xff0c;read 函数是一个系统调用&#xff0c;用于从文件描述符&#xff08;File Descriptor&#xff09;指向的文件或设备中读取数据到缓冲区。它是 Unix/Linux 系统编程中实现底层 I/O 操作的核心函数之一。以下是 read 函数的详细使用…...

[思维模式-38]:看透事物的关系:什么是事物的关系?事物之间的关系的种类?什么是因果关系?如何通过数学的方式表达因果关系?

一、什么是事物的关系&#xff1f; 事物的关系是指不同事物之间存在的各种联系和相互作用&#xff0c;它反映了事物之间的相互依存、相互影响、相互制约等特性。以下从不同维度为你详细阐述&#xff1a; 1、关系的类型 因果关系 定义&#xff1a;一个事件&#xff08;原因&a…...

【2025.5.12】视觉语言模型 (更好、更快、更强)

【2025.5.12】Vision Language Models (Better, Faster, Stronger)&#xff1a; https://huggingface.co/blog/vlms-2025 【2024.4.11】Vision Language Models Explained【先了解视觉语言模型是什么】&#xff1a; https://huggingface.co/blog/vlms nanoVLM: https://github.…...

Spring的bean的生命周期?

Spring中bean的生命周期包括以下步骤&#xff1a; 通过BeanDefinition获取bean的定义信息。 调用构造函数实例化bean。 进行bean的依赖注入&#xff0c;例如通过setter方法或Autowired注解。 处理实现了Aware接口的bean。 执行BeanPostProcessor的前置处理器。 调用初始化…...

Qwen集成clickhouse实现RAG

一、RAG概要 RAG&#xff08;Retrieval-Augmented Generation&#xff0c;检索增强生成&#xff09; 是一种结合了信息检索技术与语言生成模型的人工智能技术。旨在通过检索相关文档来增强大模型的生成能力&#xff0c;从而提高预测的质量和准确性。RAG模型在生成文本或回答…...

Excel分组计算求和的两种实现方案

文章目录 背景样例数据方案一、函数求和实现步骤缺点 方案二、数据透视表实现步骤优点 背景 在Excel文档中&#xff0c;经常会进行数据的求和计算&#xff0c;可使用不同的方式实现&#xff0c;记录下来&#xff0c;方便备查。 样例数据 已有商品销量信息&#xff0c;包含销…...

深入理解卷积神经网络:从基础原理到实战应用

在人工智能领域&#xff0c;卷积神经网络&#xff08;Convolutional Neural Network&#xff0c;简称 CNN&#xff09;凭借其强大的图像识别、处理能力&#xff0c;成为深度学习中不可或缺的技术。无论是自动驾驶汽车识别道路标志&#xff0c;还是医学影像分析辅助疾病诊断&…...

LLM定制新路径:微调与上下文学习的博弈与融合

在当今人工智能的浪潮中&#xff0c;大型语言模型&#xff08;LLMs&#xff09;已成为推动行业进步的关键力量。无论是自然语言处理、文本生成还是多模态应用&#xff0c;LLMs都在展现着它们的强大能力。然而&#xff0c;当我们将这些强大的模型应用于特定的下游任务时&#xf…...

【江苏省】《信息技术应用创新软件适配改造成本评估规范》(DB32/T 4935-2024)-标准解读系列

在信息技术应用创新产业蓬勃发展的当下&#xff0c;软件适配改造成本评估成为项目实施的关键环节。《DB32/T 4935-2024 信息技术应用创新软件适配改造成本评估规范》应运而生&#xff0c;为成本评估提供了专业依据。同时&#xff0c;《省级政务信息化项目建设方案编制规范&…...

JDK 命令行工具大全与学习方法总结 —— 从帮助文档到高效实践

JDK 命令行工具大全与学习方法总结 —— 从帮助文档到高效实践 Java开发与运维过程中&#xff0c;JDK自带的命令行工具是定位问题、性能调优、编译调试的基石。本文全面梳理JDK常用命令工具、帮助文档的获取方式&#xff0c;并总结类似Linux命令行的学习方法&#xff0c;助你系…...

嵌入式中深入理解C语言中的指针:类型、区别及应用

在嵌入式开发中,C语言是一种基础且极为重要的编程语言,其中指针作为一个非常强大且灵活的工具,广泛应用于内存管理、动态数据结构的实现以及函数参数的传递等方面。然而,尽管指针的使用极为常见,很多开发者在掌握其基本使用后,往往对指针的深入理解还不够。本文将深入分析…...

香港维尔利健康科技集团成都区域运营中心投入使用,西南市场战略全面提速

近日&#xff0c;香港维尔利健康科技集团正式宣布&#xff0c;其位于四川成都的西南区域运营中心已全面建成并投入使用。该中心将集设备调配、技术支持、客户服务、运营管理及数字健康平台维护于一体&#xff0c;成为集团在中国内地智慧医疗战略版图中的关键枢纽&#xff0c;对…...

STM32CubeMX HAL库 串口的使用

1.配置 2.开启中断后&#xff0c;生成代码 3.串口的接收 1&#xff09;.开启空闲中断接收 __HAL_UART_ENABLE_IT(huart, UART_IT_IDLE); // 关键步骤&#xff1a;启用空闲中断 2&#xff09;. 启动接收 调用 HAL_UARTEx_ReceiveToIdle_IT 启动异步接收&#xff0c;可以使用…...

二手车估值接口介绍

二手车估值接口是基于大数据和机器学习技术开发的工具&#xff0c;旨在为二手车交易、金融评估等场景提供快速、精准的车辆价值评估服务。以下从核心功能、技术原理、接口特点及应用场景等方面进行综合介绍&#xff1a; 一、核心功能 多维度数据采集与分析 接口整合了车辆基础…...

flutter 视频通话flutter_webrtc

flutter 比较热门的库 flutter_webrtc | Flutter package agora_rtc_engine | Flutter package 我使用的是flutter_webrtc 下面是官方推荐的demo库 GitHub - flutter-webrtc/flutter-webrtc-demo: Demo for flutter-webrtc 其中 https://demo.cloudwebrtc.com:8086/ 已经停…...

Babylon.js学习之路《四、Babylon.js 中的相机(Camera)与视角控制》

文章目录 1. 引言&#xff1a;为什么相机是 3D 场景的“眼睛”&#xff1f;1.1 相机的核心作用1.2 常见相机类型概览 2. 相机基础参数解析2.1 通用属性2.2 相机坐标系 3. 详解常用相机类型3.1 自由相机&#xff08;FreeCamera&#xff09;3.2 弧形旋转相机&#xff08;ArcRotat…...

【Redis实战篇】秒杀优化

1. 秒杀优化-异步秒杀思路 我们来回顾一下下单流程 当用户发起请求&#xff0c;此时会请求nginx&#xff0c;nginx会访问到tomcat&#xff0c;而tomcat中的程序&#xff0c;会进行串行操作&#xff0c;分成如下几个步骤 1、查询优惠卷 2、判断秒杀库存是否足够 3、查询订单…...

RHCE认证通过率

红帽RHCE考试总体通过率38%&#xff08;2023年数据&#xff09;&#xff0c;细分数据显示自学者通过率18%&#xff0c;参加官方培训者47%&#xff0c;企业团体考生53%。通过率差异由备考资源和考试策略决定。 RHCE考试重点考Ansible自动化运维&#xff0c;需在3.5小时内完成12…...

外贸礼品禁忌

一、亚洲 1.印度 牛是神圣动物&#xff0c;别送牛皮制品。另外&#xff0c;左手不洁&#xff0c;送礼得用右手或双手。 2.日本 “梳” 和 “苦” 谐音&#xff0c;不送梳子。日本男性不咋佩戴首饰&#xff0c;除结婚戒指。礼物得装盒、纸包、绳饰&#xff0c;白色包装得有…...

Trae IDE:AI深度集成的智能开发环境

&#xff08;以高效人机协作重塑编程体验&#xff09; 概述 Trae IDE&#xff08;发音 /treɪ/&#xff09;是一款深度集成AI能力的现代化开发工具&#xff0c;结合传统IDE的完备功能与前沿AI技术&#xff0c;提供智能问答、代码自动补全、跨文件编程及AI Agent驱动的自动化开…...

【大模型】AI智能体Coze 知识库从使用到实战详解

目录 一、前言 二、知识库介绍 2.1 coze 知识库功能介绍 2.2 coze 知识库应用场景 2.3 coze 知识库类型 2.4 coze 知识库权限说明 2.5 coze 知识库与记忆对比 2.6 知识库的使用流程 三、知识库创建与使用 3.1 创建知识库入口 3.2 创建文本知识库 3.2.1 上传文件 3.…...

【springcloud学习(dalston.sr1)】服务消费者通过restTemplate来访问服务提供者(含源代码)(五)

该系列项目整体介绍及源代码请参照前面写的一篇文章​​​​​​【springcloud学习(dalston.sr1)】项目整体介绍&#xff08;含源代码&#xff09;&#xff08;一&#xff09; 一般情况下&#xff0c;我们远程调用服务&#xff0c;可以用restTemplate来进行http请求的访问。接…...

打破边界,智评未来:AI如何重塑学科交叉融合的评价体系?

目录: 引言:当“学科孤岛”遇上“创新浪潮”透视现状:学科交叉融合的“热望”与“冰壁”他山之石:国际交叉融合模式与评价的“镜与灯”AI赋能:重构学科交叉评价的内涵、要素与方法论 4.1. 基本内涵:从“知识叠加”到“价值涌现”4.2. 评价要素:超越“单点指标”的“网络…...

ULVAC C30HMVRT系列冷冻泵和超捕集器压缩机组 安装、操作、维护和故障排除说明 含电路图

ULVAC C30HMVRT系列冷冻泵和超捕集器压缩机组 安装、操作、维护和故障排除说明 含电路图...

ORACLE查看归档是否打开

一、使用V$DATABASE视图 SELECT log_mode FROM v$database; 结果说明&#xff1a; ARCHIVELOG - 数据库处于归档模式 NOARCHIVELOG - 数据库处于非归档模式 二、 使用v$instance视图 SELECT archiver FROM v$instance; 结果说明&#xff1a; STARTED - 归档进程已启动(归档模…...

鸿蒙5.0项目开发——鸿蒙天气项目的实现(介绍)

【高心星出品】 文章目录 项目简介&#xff1a;项目运行效果图&#xff1a;主要功能&#xff1a;使用的技能点&#xff1a;开发环境&#xff1a; 项目简介&#xff1a; 这是一个基于鸿蒙系统&#xff08;HarmonyOS&#xff09;开发的天气应用&#xff0c;采用 ArkTS 语言开发&…...

3Dblox

TSMC 3Dblox Introduction 3Dblox是TSMC定义的一门语言&#xff0c;目标是将物理封装系统分解为模块化的组件&#xff0c;然后进行集成 RDL : 代表interposer的部分 Die的实例化信息 堆叠信息 连接信息 thickness&#xff1a;Die与Die连接Bump的高度 RedHawk-SC-Electrothermal…...