当前位置: 首页 > news >正文

kafka-go使用:以及kafka一些基本概念说明

关于kafka

作为开发人员kafka中最常关注的几个概念,是topic,partition和group这几个概念。topic是主题的意思,简单的说topic是数据主题,这样解释好像显得很苍白,只是做了个翻译。一图胜前言,我们还是通过图解来说明。

生产者负责写数据,一个topic可以有多个分区。如下图所示,生产者写数据的示意。从这个图中,我们可以得出一个很重要的信息:分区有序,即消息在某个分区上是有顺序的,而全局是没有顺序的。这个意味着如果我们需要保证顺序,我们在写消息时需要往同一个分区中写数据。比如,我们有一个场景,有一个订单。首先,创建支付订单,发送一个kafka消息,然后,实际支付,发送一个kafka消息,最后,又想退款,又发起了退款,又发送了一个kafka消息。如果,这三个消息在不同的分区上,我们就无法保证,我们按照创建支付订单——支付——退款这个顺序执行。依据分区有序的特点,我们可以把跟这个订单相关的所有操作的消息都写到一个分区上,比如,可以通过根据订单id进行hash。

group只跟消费者有关系,消费者通过group进行标识,一个消费者实例表示一个消费者,消费者实例可以在不同的线程中开启,也可以在不同的进程中开启。这可以提升消费的并发能力。那么,这是否意味着可以无限开启消费者实例,以提升消费者消费消息的速度呢?

这就涉及到消费的逻辑,我先给出结论。接下来,我们还是通过示意图的方式来做补充说明。group跟partition的数量有关系,当消费者数量小于等于partition数量时,每个消费者都能消费到消息。

如果一个topic有4个分区,分别为p0,p1,p2,p3。现在有两个消费者组,分别是groupA和groupB。一个消费者就是一个消费实例,比如,C0就是在一个进程中启的一个消费者实例。因为这个topic有4个分区,groupA有4个消费者,则每个消费者被分配到一个对应的分区上消费,假如,这个分组又启了一个消费者consumer1,即消费者数量大于分区数量,则这个消费者不会读到消息。

而groupB只有两个消费者,则每一个消费者分别获取两个分区上的消息,如果,groupB只有一个消费者,那么,所有分区上的消息都只有这一个消费者获取。

开源项目kafka-go的使用

kafka-go是一个用go语言开发的kafka客户端。

生产者

我们看一个基于kafka-go实现的生产者的示例:

package mainimport ("context""fmt""log""time""github.com/segmentio/kafka-go"
)func main() {// 创建Kafka写入器配置writerConfig := kafka.WriterConfig{Brokers:  []string{"10.10.37.100:30001"},Topic:    "my-topic",Balancer: &kafka.Hash{}, //消息分区的策略,这个策略是通过hash算法根据kafka.Message的key值来选择分区的}// 创建写入器writer := kafka.NewWriter(writerConfig)// 发送消息messages := []string{"message 1", "message 2", "message 3", "message 4", "message 5", "message 6", "message 7", "message 8", "message 9", "message 10", "message 11", "message 12", "message 13", "message 14", "message 15", "message 16"}for _, msg := range messages {time.Sleep(1 * time.Second)if err := writer.WriteMessages(context.Background(), kafka.Message{Key:   []byte(fmt.Sprintf("key-%d", time.Now().UnixNano())),Value: []byte(msg),}); err != nil {fmt.Printf("Failed to write message: %v\n", err)} else {fmt.Printf("Message written: %s\n", msg)}}// 关闭写入器if err := writer.Close(); err != nil {fmt.Printf("Failed to close writer: %v\n", err)}
}

WriterConfig中有一个字段我们在开发过程中可能会需要关注到,Balancer:这个用于把消息分发到不同的分区上。这个策略可以自定义。当然,kafka-go中提供几种常用的方法,我以示例中的hash这个为例做简要说明,我们直接看源码。生产者发送消息的逻辑主要在这个WriteMessages方法中,我们直接定位到消息分发到对应分区的逻辑。

func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {//......//忽略与分区不相关的代码,我们只关注和分区相关的逻辑balancer := w.balancer()for i, msg := range msgs {topic, err := w.chooseTopic(msg)if err != nil {return err}numPartitions, err := w.partitions(ctx, topic)if err != nil {return err}//根据msg计算把消息分发到哪个分区partition := balancer.Balance(msg, loadCachedPartitions(numPartitions)...)key := topicPartition{topic:     topic,partition: int32(partition),}assignments[key] = append(assignments[key], int32(i))}batches := w.batchMessages(msgs, assignments)if w.Async {return nil}.....//忽略一些细节return werr
}

如果设置了分区策略,则以设置的分区策略进行分发消息。 

func (w *Writer) balancer() Balancer {if w.Balancer != nil {return w.Balancer}return &w.roundRobin
}

接下来,我们看一下我的示例中使用的Hash的分发消息的策略。我们看到这个方法的主要逻辑,就是根据msg中key进行hash,然后根据topic下总分区数来计算消息分发到对应的分区号。hasher.Write(msg.Key)

func (h *Hash) Balance(msg Message, partitions ...int) int {if msg.Key == nil {return h.rr.Balance(msg, partitions...)}hasher := h.Hasherif hasher != nil {h.lock.Lock()defer h.lock.Unlock()} else {hasher = fnv1aPool.Get().(hash.Hash32)defer fnv1aPool.Put(hasher)}hasher.Reset()if _, err := hasher.Write(msg.Key); err != nil {panic(err)}// uses same algorithm that Sarama's hashPartitioner uses// note the type conversions here.  if the uint32 hash code is not cast to// an int32, we do not get the same result as sarama.partition := int32(hasher.Sum32()) % int32(len(partitions))if partition < 0 {partition = -partition}return int(partition)
}

消费者

消费者没有太多的注意事项,只是如果有多个分区,想要提升并发能力,可以启多个消费者。我们直接看示例。

package mainimport ("context""fmt""os""os/signal""syscall""time""github.com/segmentio/kafka-go"
)func main() {// 创建Kafka读取器配置readerConfig := kafka.ReaderConfig{Brokers:  []string{"10.10.37.100:30001"},Topic:    "my-topic",GroupID:  "my-group",MinBytes: 10e3, // 10KBMaxBytes: 10e6, // 10MB}// 创建读取器reader := kafka.NewReader(readerConfig)// 处理信号以便优雅地关闭sigChan := make(chan os.Signal, 1)signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)// 读取消息for {select {case sig := <-sigChan:fmt.Printf("Caught signal %v: terminating\n", sig)returndefault:msg, err := reader.ReadMessage(context.Background())if err != nil {fmt.Printf("Failed to read message: %v\n", err)continue}fmt.Printf("当前时间:%v Message on topic: %s value: %s partion:%d\n", time.Now(), msg.Topic, string(msg.Value), msg.Partition)}}
}

创建topic

直接上示例。创建topic是个幂等操作。

package mainimport ("net""strconv""github.com/segmentio/kafka-go"
)func main() {// to create topics when auto.create.topics.enable='false'topic := "my-topic"conn, err := kafka.Dial("tcp", "10.10.37.100:30001")if err != nil {panic(err.Error())}defer conn.Close()controller, err := conn.Controller()if err != nil {panic(err.Error())}var controllerConn *kafka.ConncontrollerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))if err != nil {panic(err.Error())}defer controllerConn.Close()topicConfigs := []kafka.TopicConfig{{Topic:             topic,NumPartitions:     10,ReplicationFactor: 1,},}err = controllerConn.CreateTopics(topicConfigs...)if err != nil {panic(err.Error())}
}

相关文章:

kafka-go使用:以及kafka一些基本概念说明

关于kafka 作为开发人员kafka中最常关注的几个概念&#xff0c;是topic,partition和group这几个概念。topic是主题的意思&#xff0c;简单的说topic是数据主题&#xff0c;这样解释好像显得很苍白&#xff0c;只是做了个翻译。一图胜前言&#xff0c;我们还是通过图解来说明。…...

景联文科技:破解数据标注行业痛点,引领高质量AI数据服务

数据标注行业是人工智能和机器学习领域中一个非常重要的组成部分。随着AI技术的发展&#xff0c;对高质量标注数据的需求也在不断增长。 数据标注市场的痛点 1. 团队管理 在众包和转包模式下&#xff0c;管理大量的标注人员是一项挑战。 需要确保标注人员的专业性、稳定性和…...

C#获取Network的相关信息

1&#xff0c;获取网络的通断。 //方法1&#xff1a;无效果&#xff0c;并不能反映当前网络通断 bool availableSystem.Windows.Forms.SystemInformation.Network//方法2&#xff1a;通过VB获取网络状态&#xff0c;可反映当前网络通断 Microsoft.VisualBasic.Devices.Network…...

Jenkins 部署Vue项目指引: Vue项目本地跨域代理 、解决ERR_UNSAFE_PORT

文章目录 引言I Jenkins 部署Vue项目配置插件安装系统配置NodeJS安装目录和别名设置新建任务(通用类型)构建环境Build Steps(构建步骤)II nginx部署站点(端口和站点目录的映射)查找Nginx配置文件端口和站点目录的映射III Vue项目本地跨域代理,屏蔽掉后端服务API的网关IP…...

C语言电子画板

目录 开头程序程序的流程图程序的效果结尾 开头 大家好&#xff0c;我叫这是我58。今天&#xff0c;我们来看一下我用C语言编译的电子画板和与之相关的一些东西。 程序 #define _CRT_SECURE_NO_WARNINGS 1 #include <stdio.h> #include <Windows.h> int main() …...

Android Gradle开发与应用技术原理

Android Gradle开发与应用技术原理 Android Gradle开发与应用技术原理一、概述二、Gradle构建原理1. Gradle架构2. Gradle构建过程3. 构建脚本 三、Gradle插件机制四、在Android应用中实现Text-to-Speech&#xff08;TTS&#xff09;功能1. 配置Gradle依赖2. 实现TTS功能示例代…...

Midjourney入门-提示词基础撰写与公式

​ 前言 在前几篇教程里我们已经可以初步使用Midjourney进行出图了。 包括也了解了Midjourney的指令与参数。 但如果你想用Midjourney去生成各种各样高质量的图片&#xff0c; 并且生成的图片是你想要的画面内容&#xff0c;也就是更好控制生成图片的画面内容与风格&#xf…...

Apache Tomcat服务器版本号隐藏

渗透测试时发现有一台服务器的404报错页面中&#xff0c;有Apache Tomcat的版本号信息显示&#xff0c;发生了信息泄露&#xff0c;可能导致服务器被攻击。如下所示&#xff1a; 解决步骤如下&#xff1a; 1. 隐藏HTTP响应头中的Server信息 Tomcat默认会在HTTP响应头中包含S…...

【Qt】Qt编程注意事项

目录 Qr中的命名规范 Qt Creator中的快捷键 查询文档的方式 Qt窗口坐标体系 Qr中的命名规范 在学习编程语言阶段&#xff0c;给变量、函数、文件、类命名是非常有讲究的。 命名要有描述性&#xff0c;不要使用abc&#xff0c;xyz这种比较无规律的名字类描述。如果名字比较…...

在Linux系统安装Kafka

注意&#xff1a;我的是在云服务器上基于Docker配 在防火墙上放行端口号 2181(Zookeeper) 9092(Kafka) 一、先配置 Docker 守护进程&#xff08;daemon&#xff09;的镜像加速器&#xff08;registry mirrors&#xff09; sudo mkdir -p /etc/docker sudo tee /etc/docker/da…...

【CSharp】简单定义一个异步方法

【CSharp】定义一个异步方法 1.背景2.异步方法3.代码说明1.背景 相关博客: 【CSharp】使用异步事件处理程序和委托来进行异步调用 https://blog.csdn.net/jn10010537/article/details/140898179在 C# 中,异步方法和同步方法是两种执行代码的方式, 它们主要区别在于处理任务…...

贪心算法之货仓选址问题

#include<stdio.h> #include<stdlib.h> #include<math.h>//贪心算法之货仓选址问题/*** void* p是万能指针&#xff0c;可以和其它任意类型的指针进行转换&#xff0c;前提是确保转换是合法的*/ //写好用于qsort的比较函数&#xff0c;这里写的函数一般用于…...

Java网络编程——Request Response 对象

Response - 网页 上一章我们学习了 Java 中使用 Okhttp3 库请求网页或调用 API 的知识。 使用一条语句执行调用请求&#xff0c;并取得返回结果字符串&#xff1a; call.execute().body().string()execute() 方法是真正执行发送请求&#xff0c;前面的一系列代码是做前置准备…...

【代码随想录训练营第42期 Day24打卡 回溯Part3 - LeetCode 93.复原IP地址 78.子集 90.子集II

目录 一、做题心得 二、题目与题解 题目一&#xff1a;93.复原IP地址 题目链接 题解&#xff1a;回溯--分割问题 题目二&#xff1a;78.子集 题目链接 题解&#xff1a;回溯--子集问题 题目三&#xff1a;90.子集II 题目链接 题解&#xff1a;回溯--子集问题 三、小…...

python venv和virtualenv详解

一、venv简介 C:\Users\love1>python -m venv -h usage: venv [-h] [--system-site-packages] [--symlinks | --copies] [--clear] [--upgrade] [--without-pip][--prompt PROMPT] [--upgrade-deps]ENV_DIR [ENV_DIR ...]该命令用于在一个目录或者多个目录中创建一个虚拟的…...

《征服数据结构》树堆(Treap)

摘要&#xff1a; 1&#xff0c;Treap的介绍 2&#xff0c;Treap节点的插入 3&#xff0c;Treap节点的删除 4&#xff0c;Treap和笛卡尔树的区别 1&#xff0c;Treap的介绍 Treap又叫树堆&#xff0c;属于一种自平衡二叉搜索树&#xff0c;是由单词Tree和Heap构成&#xff0c;是…...

论文笔记:OneBit: Towards Extremely Low-bit Large Language Models

202402 arxiv 1 背景 模型量化主要通过把模型的线性层【nn.Linear】&#xff08;Embedding 层和 Lm_head 层除外&#xff09;转化为低精度表示实现空间压缩 此前工作的基础是利用 Round-To-Nearest&#xff08;RTN&#xff09;方法把高精度浮点数近似映射到附近的整数网格然而…...

英语文化中的音乐分类及其发展历史(Classical、Jazz、Rock、Pop、Electronic、Country、RB、Hip-Hop)

文章目录 英语文化中的音乐分类及其发展历史1. 简介2. 古典音乐 (Classical Music)2.1 起源与发展2.2 技术与风格 3. 爵士音乐 (Jazz Music)3.1 起源与发展3.2 技术与风格 4. 摇滚音乐 (Rock Music)&#xff08;Rock and roll&#xff09;4.1 起源与发展4.2 技术与风格 5. 蓝调…...

C语言-栈、队列、二叉树

12 栈、队列、二叉树 目录 12 栈、队列、二叉树 一、栈、队列、二叉树是什么&#xff1f; 二、栈 1. 特点&#xff1a;先进后出 -- 有底的盒子 2. 使用场景&#xff1a;函数调用 -- 中断机制 3. 实现栈的形式&#xff1a; 三、队列 1. 特点&#xff1a;先进先出 -- 水…...

pinia-plugin-persistedstate 插件不生效

引入使用该插件使用时发现不生效 原因&#xff1a;pinia实例调用顺序不当 将&#xff1a; // import ./assets/main.css import { createApp } from vue import { createPinia } from pinia import piniaPluginPersistedstate from pinia-plugin-persistedstate import App fr…...

低成本自动化方案:OpenClaw+自部署千问3.5-27B替代ChatGPT API调用

低成本自动化方案&#xff1a;OpenClaw自部署千问3.5-27B替代ChatGPT API调用 1. 为什么选择本地模型OpenClaw组合 去年我用ChatGPT API开发自动化脚本时&#xff0c;发现一个致命问题&#xff1a;当任务需要连续调用多个API时&#xff08;比如先搜索资料再整理成报告&#x…...

Mac 本地轻量级 K8s 开发环境实战指南

1. 为什么要在Mac上搭建轻量级K8s环境&#xff1f; 作为开发者&#xff0c;我们经常需要在本地测试Kubernetes应用&#xff0c;但传统方案要么太重&#xff08;如完整K8s集群&#xff09;&#xff0c;要么太慢&#xff08;如云环境&#xff09;。在Mac上搭建轻量级K8s环境可以完…...

别再只盯着CAN了!聊聊LIN总线在低成本IoT传感器网络里的那些‘骚操作’

LIN总线在低成本IoT传感器网络中的创新实践 当谈到工业物联网和传感器网络通信协议时&#xff0c;大多数人会立刻想到CAN、Modbus或以太网协议。但有一个被严重低估的选项正在悄然崛起——LIN总线。这个原本为汽车电子设计的轻量级协议&#xff0c;凭借其独特的成本优势和简洁架…...

机器人控制入门:用Pi0具身智能v1镜像5分钟搭建你的第一个动作预测Demo

机器人控制入门&#xff1a;用Pi0具身智能v1镜像5分钟搭建你的第一个动作预测Demo 1. 快速部署Pi0具身智能镜像 1.1 选择并启动镜像 在云平台镜像市场中搜索并选择"ins-pi0-independent-v1"镜像&#xff0c;点击"部署实例"按钮。首次启动大约需要1-2分钟…...

SEO 推广与传统广告推广有什么区别

SEO 推广与传统广告推广有什么区别 在当今的数字化时代&#xff0c;企业如何有效地推广自己的产品和服务成为了一个亟待解决的问题。两种常见的推广方式——SEO 推广与传统广告推广——各有优劣&#xff0c;企业需要根据自身的需求和市场环境进行选择。本文将详细探讨SEO推广和…...

Nano-Banana与PyTorch Lightning集成:简化深度学习流程

Nano-Banana与PyTorch Lightning集成&#xff1a;简化深度学习流程 用更少的代码&#xff0c;做更多的事情——这就是PyTorch Lightning的魅力所在 如果你正在使用Nano-Banana进行深度学习项目&#xff0c;可能会发现编写训练循环、管理设备、处理日志记录这些重复性工作相当耗…...

Qwen3.5-9B多模态能力解析:图文输入联合建模+VL变体兼容性说明

Qwen3.5-9B多模态能力解析&#xff1a;图文输入联合建模VL变体兼容性说明 1. 模型概述与核心能力 Qwen3.5-9B是一款拥有90亿参数的开源大语言模型&#xff0c;在多模态理解和长上下文处理方面展现出卓越性能。作为当前开源社区的重要贡献&#xff0c;该模型特别强化了图文联合…...

为什么钉钉、飞书、企微都在做 CLI?这个开源项目给出了最极致的答案

❝AI Agent 很聪明&#xff0c;但面对真实的专业软件&#xff0c;它就是个"睁眼瞎"。CLI-Anything 说&#xff1a;我来治。❞先说一个扎心的事实2026年了&#xff0c;AI Agent 能写代码、能做分析、能聊天能画画——但你让它打开 Blender 建个模&#xff1f;让它用 G…...

OpenClaw自动化边界:千问3.5-27B不适合处理的五类任务

OpenClaw自动化边界&#xff1a;千问3.5-27B不适合处理的五类任务 1. 为什么需要明确自动化边界&#xff1f; 去年冬天&#xff0c;我花了整整三天时间调试一个OpenClaw自动化流程——让AI帮我整理电脑里积压的200GB设计素材。当看到脚本误删了未备份的客户源文件时&#xff…...

JeecgBoot启动配置

一、引入maven指定自己的maven仓库 二、指定JDK 记得apply&#xff01;&#xff01;&#xff01;&#xff01;然后OK 三、配置MySQL数据库(尽量≥5.7版本) 四、运行db文件夹下的SQL文件 五、后端本地环境&#xff08;application-dev.yml&#xff09;指定好数据源 1、M…...