kafka-go使用:以及kafka一些基本概念说明
关于kafka
作为开发人员kafka中最常关注的几个概念,是topic,partition和group这几个概念。topic是主题的意思,简单的说topic是数据主题,这样解释好像显得很苍白,只是做了个翻译。一图胜前言,我们还是通过图解来说明。
生产者负责写数据,一个topic可以有多个分区。如下图所示,生产者写数据的示意。从这个图中,我们可以得出一个很重要的信息:分区有序,即消息在某个分区上是有顺序的,而全局是没有顺序的。这个意味着如果我们需要保证顺序,我们在写消息时需要往同一个分区中写数据。比如,我们有一个场景,有一个订单。首先,创建支付订单,发送一个kafka消息,然后,实际支付,发送一个kafka消息,最后,又想退款,又发起了退款,又发送了一个kafka消息。如果,这三个消息在不同的分区上,我们就无法保证,我们按照创建支付订单——支付——退款这个顺序执行。依据分区有序的特点,我们可以把跟这个订单相关的所有操作的消息都写到一个分区上,比如,可以通过根据订单id进行hash。

group只跟消费者有关系,消费者通过group进行标识,一个消费者实例表示一个消费者,消费者实例可以在不同的线程中开启,也可以在不同的进程中开启。这可以提升消费的并发能力。那么,这是否意味着可以无限开启消费者实例,以提升消费者消费消息的速度呢?
这就涉及到消费的逻辑,我先给出结论。接下来,我们还是通过示意图的方式来做补充说明。group跟partition的数量有关系,当消费者数量小于等于partition数量时,每个消费者都能消费到消息。
如果一个topic有4个分区,分别为p0,p1,p2,p3。现在有两个消费者组,分别是groupA和groupB。一个消费者就是一个消费实例,比如,C0就是在一个进程中启的一个消费者实例。因为这个topic有4个分区,groupA有4个消费者,则每个消费者被分配到一个对应的分区上消费,假如,这个分组又启了一个消费者consumer1,即消费者数量大于分区数量,则这个消费者不会读到消息。
而groupB只有两个消费者,则每一个消费者分别获取两个分区上的消息,如果,groupB只有一个消费者,那么,所有分区上的消息都只有这一个消费者获取。

开源项目kafka-go的使用
kafka-go是一个用go语言开发的kafka客户端。
生产者
我们看一个基于kafka-go实现的生产者的示例:
package mainimport ("context""fmt""log""time""github.com/segmentio/kafka-go"
)func main() {// 创建Kafka写入器配置writerConfig := kafka.WriterConfig{Brokers: []string{"10.10.37.100:30001"},Topic: "my-topic",Balancer: &kafka.Hash{}, //消息分区的策略,这个策略是通过hash算法根据kafka.Message的key值来选择分区的}// 创建写入器writer := kafka.NewWriter(writerConfig)// 发送消息messages := []string{"message 1", "message 2", "message 3", "message 4", "message 5", "message 6", "message 7", "message 8", "message 9", "message 10", "message 11", "message 12", "message 13", "message 14", "message 15", "message 16"}for _, msg := range messages {time.Sleep(1 * time.Second)if err := writer.WriteMessages(context.Background(), kafka.Message{Key: []byte(fmt.Sprintf("key-%d", time.Now().UnixNano())),Value: []byte(msg),}); err != nil {fmt.Printf("Failed to write message: %v\n", err)} else {fmt.Printf("Message written: %s\n", msg)}}// 关闭写入器if err := writer.Close(); err != nil {fmt.Printf("Failed to close writer: %v\n", err)}
}
WriterConfig中有一个字段我们在开发过程中可能会需要关注到,Balancer:这个用于把消息分发到不同的分区上。这个策略可以自定义。当然,kafka-go中提供几种常用的方法,我以示例中的hash这个为例做简要说明,我们直接看源码。生产者发送消息的逻辑主要在这个WriteMessages方法中,我们直接定位到消息分发到对应分区的逻辑。
func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {//......//忽略与分区不相关的代码,我们只关注和分区相关的逻辑balancer := w.balancer()for i, msg := range msgs {topic, err := w.chooseTopic(msg)if err != nil {return err}numPartitions, err := w.partitions(ctx, topic)if err != nil {return err}//根据msg计算把消息分发到哪个分区partition := balancer.Balance(msg, loadCachedPartitions(numPartitions)...)key := topicPartition{topic: topic,partition: int32(partition),}assignments[key] = append(assignments[key], int32(i))}batches := w.batchMessages(msgs, assignments)if w.Async {return nil}.....//忽略一些细节return werr
}
如果设置了分区策略,则以设置的分区策略进行分发消息。
func (w *Writer) balancer() Balancer {if w.Balancer != nil {return w.Balancer}return &w.roundRobin
}
接下来,我们看一下我的示例中使用的Hash的分发消息的策略。我们看到这个方法的主要逻辑,就是根据msg中key进行hash,然后根据topic下总分区数来计算消息分发到对应的分区号。hasher.Write(msg.Key)
func (h *Hash) Balance(msg Message, partitions ...int) int {if msg.Key == nil {return h.rr.Balance(msg, partitions...)}hasher := h.Hasherif hasher != nil {h.lock.Lock()defer h.lock.Unlock()} else {hasher = fnv1aPool.Get().(hash.Hash32)defer fnv1aPool.Put(hasher)}hasher.Reset()if _, err := hasher.Write(msg.Key); err != nil {panic(err)}// uses same algorithm that Sarama's hashPartitioner uses// note the type conversions here. if the uint32 hash code is not cast to// an int32, we do not get the same result as sarama.partition := int32(hasher.Sum32()) % int32(len(partitions))if partition < 0 {partition = -partition}return int(partition)
}
消费者
消费者没有太多的注意事项,只是如果有多个分区,想要提升并发能力,可以启多个消费者。我们直接看示例。
package mainimport ("context""fmt""os""os/signal""syscall""time""github.com/segmentio/kafka-go"
)func main() {// 创建Kafka读取器配置readerConfig := kafka.ReaderConfig{Brokers: []string{"10.10.37.100:30001"},Topic: "my-topic",GroupID: "my-group",MinBytes: 10e3, // 10KBMaxBytes: 10e6, // 10MB}// 创建读取器reader := kafka.NewReader(readerConfig)// 处理信号以便优雅地关闭sigChan := make(chan os.Signal, 1)signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)// 读取消息for {select {case sig := <-sigChan:fmt.Printf("Caught signal %v: terminating\n", sig)returndefault:msg, err := reader.ReadMessage(context.Background())if err != nil {fmt.Printf("Failed to read message: %v\n", err)continue}fmt.Printf("当前时间:%v Message on topic: %s value: %s partion:%d\n", time.Now(), msg.Topic, string(msg.Value), msg.Partition)}}
}
创建topic
直接上示例。创建topic是个幂等操作。
package mainimport ("net""strconv""github.com/segmentio/kafka-go"
)func main() {// to create topics when auto.create.topics.enable='false'topic := "my-topic"conn, err := kafka.Dial("tcp", "10.10.37.100:30001")if err != nil {panic(err.Error())}defer conn.Close()controller, err := conn.Controller()if err != nil {panic(err.Error())}var controllerConn *kafka.ConncontrollerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))if err != nil {panic(err.Error())}defer controllerConn.Close()topicConfigs := []kafka.TopicConfig{{Topic: topic,NumPartitions: 10,ReplicationFactor: 1,},}err = controllerConn.CreateTopics(topicConfigs...)if err != nil {panic(err.Error())}
}
相关文章:
kafka-go使用:以及kafka一些基本概念说明
关于kafka 作为开发人员kafka中最常关注的几个概念,是topic,partition和group这几个概念。topic是主题的意思,简单的说topic是数据主题,这样解释好像显得很苍白,只是做了个翻译。一图胜前言,我们还是通过图解来说明。…...
景联文科技:破解数据标注行业痛点,引领高质量AI数据服务
数据标注行业是人工智能和机器学习领域中一个非常重要的组成部分。随着AI技术的发展,对高质量标注数据的需求也在不断增长。 数据标注市场的痛点 1. 团队管理 在众包和转包模式下,管理大量的标注人员是一项挑战。 需要确保标注人员的专业性、稳定性和…...
C#获取Network的相关信息
1,获取网络的通断。 //方法1:无效果,并不能反映当前网络通断 bool availableSystem.Windows.Forms.SystemInformation.Network//方法2:通过VB获取网络状态,可反映当前网络通断 Microsoft.VisualBasic.Devices.Network…...
Jenkins 部署Vue项目指引: Vue项目本地跨域代理 、解决ERR_UNSAFE_PORT
文章目录 引言I Jenkins 部署Vue项目配置插件安装系统配置NodeJS安装目录和别名设置新建任务(通用类型)构建环境Build Steps(构建步骤)II nginx部署站点(端口和站点目录的映射)查找Nginx配置文件端口和站点目录的映射III Vue项目本地跨域代理,屏蔽掉后端服务API的网关IP…...
C语言电子画板
目录 开头程序程序的流程图程序的效果结尾 开头 大家好,我叫这是我58。今天,我们来看一下我用C语言编译的电子画板和与之相关的一些东西。 程序 #define _CRT_SECURE_NO_WARNINGS 1 #include <stdio.h> #include <Windows.h> int main() …...
Android Gradle开发与应用技术原理
Android Gradle开发与应用技术原理 Android Gradle开发与应用技术原理一、概述二、Gradle构建原理1. Gradle架构2. Gradle构建过程3. 构建脚本 三、Gradle插件机制四、在Android应用中实现Text-to-Speech(TTS)功能1. 配置Gradle依赖2. 实现TTS功能示例代…...
Midjourney入门-提示词基础撰写与公式
前言 在前几篇教程里我们已经可以初步使用Midjourney进行出图了。 包括也了解了Midjourney的指令与参数。 但如果你想用Midjourney去生成各种各样高质量的图片, 并且生成的图片是你想要的画面内容,也就是更好控制生成图片的画面内容与风格…...
Apache Tomcat服务器版本号隐藏
渗透测试时发现有一台服务器的404报错页面中,有Apache Tomcat的版本号信息显示,发生了信息泄露,可能导致服务器被攻击。如下所示: 解决步骤如下: 1. 隐藏HTTP响应头中的Server信息 Tomcat默认会在HTTP响应头中包含S…...
【Qt】Qt编程注意事项
目录 Qr中的命名规范 Qt Creator中的快捷键 查询文档的方式 Qt窗口坐标体系 Qr中的命名规范 在学习编程语言阶段,给变量、函数、文件、类命名是非常有讲究的。 命名要有描述性,不要使用abc,xyz这种比较无规律的名字类描述。如果名字比较…...
在Linux系统安装Kafka
注意:我的是在云服务器上基于Docker配 在防火墙上放行端口号 2181(Zookeeper) 9092(Kafka) 一、先配置 Docker 守护进程(daemon)的镜像加速器(registry mirrors) sudo mkdir -p /etc/docker sudo tee /etc/docker/da…...
【CSharp】简单定义一个异步方法
【CSharp】定义一个异步方法 1.背景2.异步方法3.代码说明1.背景 相关博客: 【CSharp】使用异步事件处理程序和委托来进行异步调用 https://blog.csdn.net/jn10010537/article/details/140898179在 C# 中,异步方法和同步方法是两种执行代码的方式, 它们主要区别在于处理任务…...
贪心算法之货仓选址问题
#include<stdio.h> #include<stdlib.h> #include<math.h>//贪心算法之货仓选址问题/*** void* p是万能指针,可以和其它任意类型的指针进行转换,前提是确保转换是合法的*/ //写好用于qsort的比较函数,这里写的函数一般用于…...
Java网络编程——Request Response 对象
Response - 网页 上一章我们学习了 Java 中使用 Okhttp3 库请求网页或调用 API 的知识。 使用一条语句执行调用请求,并取得返回结果字符串: call.execute().body().string()execute() 方法是真正执行发送请求,前面的一系列代码是做前置准备…...
【代码随想录训练营第42期 Day24打卡 回溯Part3 - LeetCode 93.复原IP地址 78.子集 90.子集II
目录 一、做题心得 二、题目与题解 题目一:93.复原IP地址 题目链接 题解:回溯--分割问题 题目二:78.子集 题目链接 题解:回溯--子集问题 题目三:90.子集II 题目链接 题解:回溯--子集问题 三、小…...
python venv和virtualenv详解
一、venv简介 C:\Users\love1>python -m venv -h usage: venv [-h] [--system-site-packages] [--symlinks | --copies] [--clear] [--upgrade] [--without-pip][--prompt PROMPT] [--upgrade-deps]ENV_DIR [ENV_DIR ...]该命令用于在一个目录或者多个目录中创建一个虚拟的…...
《征服数据结构》树堆(Treap)
摘要: 1,Treap的介绍 2,Treap节点的插入 3,Treap节点的删除 4,Treap和笛卡尔树的区别 1,Treap的介绍 Treap又叫树堆,属于一种自平衡二叉搜索树,是由单词Tree和Heap构成,是…...
论文笔记:OneBit: Towards Extremely Low-bit Large Language Models
202402 arxiv 1 背景 模型量化主要通过把模型的线性层【nn.Linear】(Embedding 层和 Lm_head 层除外)转化为低精度表示实现空间压缩 此前工作的基础是利用 Round-To-Nearest(RTN)方法把高精度浮点数近似映射到附近的整数网格然而…...
英语文化中的音乐分类及其发展历史(Classical、Jazz、Rock、Pop、Electronic、Country、RB、Hip-Hop)
文章目录 英语文化中的音乐分类及其发展历史1. 简介2. 古典音乐 (Classical Music)2.1 起源与发展2.2 技术与风格 3. 爵士音乐 (Jazz Music)3.1 起源与发展3.2 技术与风格 4. 摇滚音乐 (Rock Music)(Rock and roll)4.1 起源与发展4.2 技术与风格 5. 蓝调…...
C语言-栈、队列、二叉树
12 栈、队列、二叉树 目录 12 栈、队列、二叉树 一、栈、队列、二叉树是什么? 二、栈 1. 特点:先进后出 -- 有底的盒子 2. 使用场景:函数调用 -- 中断机制 3. 实现栈的形式: 三、队列 1. 特点:先进先出 -- 水…...
pinia-plugin-persistedstate 插件不生效
引入使用该插件使用时发现不生效 原因:pinia实例调用顺序不当 将: // import ./assets/main.css import { createApp } from vue import { createPinia } from pinia import piniaPluginPersistedstate from pinia-plugin-persistedstate import App fr…...
日语AI面试高效通关秘籍:专业解读与青柚面试智能助攻
在如今就业市场竞争日益激烈的背景下,越来越多的求职者将目光投向了日本及中日双语岗位。但是,一场日语面试往往让许多人感到步履维艰。你是否也曾因为面试官抛出的“刁钻问题”而心生畏惧?面对生疏的日语交流环境,即便提前恶补了…...
C++_核心编程_多态案例二-制作饮品
#include <iostream> #include <string> using namespace std;/*制作饮品的大致流程为:煮水 - 冲泡 - 倒入杯中 - 加入辅料 利用多态技术实现本案例,提供抽象制作饮品基类,提供子类制作咖啡和茶叶*//*基类*/ class AbstractDr…...
进程地址空间(比特课总结)
一、进程地址空间 1. 环境变量 1 )⽤户级环境变量与系统级环境变量 全局属性:环境变量具有全局属性,会被⼦进程继承。例如当bash启动⼦进程时,环 境变量会⾃动传递给⼦进程。 本地变量限制:本地变量只在当前进程(ba…...
【Java学习笔记】Arrays类
Arrays 类 1. 导入包:import java.util.Arrays 2. 常用方法一览表 方法描述Arrays.toString()返回数组的字符串形式Arrays.sort()排序(自然排序和定制排序)Arrays.binarySearch()通过二分搜索法进行查找(前提:数组是…...
【机器视觉】单目测距——运动结构恢复
ps:图是随便找的,为了凑个封面 前言 在前面对光流法进行进一步改进,希望将2D光流推广至3D场景流时,发现2D转3D过程中存在尺度歧义问题,需要补全摄像头拍摄图像中缺失的深度信息,否则解空间不收敛…...
分布式增量爬虫实现方案
之前我们在讨论的是分布式爬虫如何实现增量爬取。增量爬虫的目标是只爬取新产生或发生变化的页面,避免重复抓取,以节省资源和时间。 在分布式环境下,增量爬虫的实现需要考虑多个爬虫节点之间的协调和去重。 另一种思路:将增量判…...
2025季度云服务器排行榜
在全球云服务器市场,各厂商的排名和地位并非一成不变,而是由其独特的优势、战略布局和市场适应性共同决定的。以下是根据2025年市场趋势,对主要云服务器厂商在排行榜中占据重要位置的原因和优势进行深度分析: 一、全球“三巨头”…...
动态 Web 开发技术入门篇
一、HTTP 协议核心 1.1 HTTP 基础 协议全称 :HyperText Transfer Protocol(超文本传输协议) 默认端口 :HTTP 使用 80 端口,HTTPS 使用 443 端口。 请求方法 : GET :用于获取资源,…...
AI+无人机如何守护濒危物种?YOLOv8实现95%精准识别
【导读】 野生动物监测在理解和保护生态系统中发挥着至关重要的作用。然而,传统的野生动物观察方法往往耗时耗力、成本高昂且范围有限。无人机的出现为野生动物监测提供了有前景的替代方案,能够实现大范围覆盖并远程采集数据。尽管具备这些优势…...
基于Java+VUE+MariaDB实现(Web)仿小米商城
仿小米商城 环境安装 nodejs maven JDK11 运行 mvn clean install -DskipTestscd adminmvn spring-boot:runcd ../webmvn spring-boot:runcd ../xiaomi-store-admin-vuenpm installnpm run servecd ../xiaomi-store-vuenpm installnpm run serve 注意:运行前…...
