kafka-go使用:以及kafka一些基本概念说明
关于kafka
作为开发人员kafka中最常关注的几个概念,是topic,partition和group这几个概念。topic是主题的意思,简单的说topic是数据主题,这样解释好像显得很苍白,只是做了个翻译。一图胜前言,我们还是通过图解来说明。
生产者负责写数据,一个topic可以有多个分区。如下图所示,生产者写数据的示意。从这个图中,我们可以得出一个很重要的信息:分区有序,即消息在某个分区上是有顺序的,而全局是没有顺序的。这个意味着如果我们需要保证顺序,我们在写消息时需要往同一个分区中写数据。比如,我们有一个场景,有一个订单。首先,创建支付订单,发送一个kafka消息,然后,实际支付,发送一个kafka消息,最后,又想退款,又发起了退款,又发送了一个kafka消息。如果,这三个消息在不同的分区上,我们就无法保证,我们按照创建支付订单——支付——退款这个顺序执行。依据分区有序的特点,我们可以把跟这个订单相关的所有操作的消息都写到一个分区上,比如,可以通过根据订单id进行hash。

group只跟消费者有关系,消费者通过group进行标识,一个消费者实例表示一个消费者,消费者实例可以在不同的线程中开启,也可以在不同的进程中开启。这可以提升消费的并发能力。那么,这是否意味着可以无限开启消费者实例,以提升消费者消费消息的速度呢?
这就涉及到消费的逻辑,我先给出结论。接下来,我们还是通过示意图的方式来做补充说明。group跟partition的数量有关系,当消费者数量小于等于partition数量时,每个消费者都能消费到消息。
如果一个topic有4个分区,分别为p0,p1,p2,p3。现在有两个消费者组,分别是groupA和groupB。一个消费者就是一个消费实例,比如,C0就是在一个进程中启的一个消费者实例。因为这个topic有4个分区,groupA有4个消费者,则每个消费者被分配到一个对应的分区上消费,假如,这个分组又启了一个消费者consumer1,即消费者数量大于分区数量,则这个消费者不会读到消息。
而groupB只有两个消费者,则每一个消费者分别获取两个分区上的消息,如果,groupB只有一个消费者,那么,所有分区上的消息都只有这一个消费者获取。

开源项目kafka-go的使用
kafka-go是一个用go语言开发的kafka客户端。
生产者
我们看一个基于kafka-go实现的生产者的示例:
package mainimport ("context""fmt""log""time""github.com/segmentio/kafka-go"
)func main() {// 创建Kafka写入器配置writerConfig := kafka.WriterConfig{Brokers: []string{"10.10.37.100:30001"},Topic: "my-topic",Balancer: &kafka.Hash{}, //消息分区的策略,这个策略是通过hash算法根据kafka.Message的key值来选择分区的}// 创建写入器writer := kafka.NewWriter(writerConfig)// 发送消息messages := []string{"message 1", "message 2", "message 3", "message 4", "message 5", "message 6", "message 7", "message 8", "message 9", "message 10", "message 11", "message 12", "message 13", "message 14", "message 15", "message 16"}for _, msg := range messages {time.Sleep(1 * time.Second)if err := writer.WriteMessages(context.Background(), kafka.Message{Key: []byte(fmt.Sprintf("key-%d", time.Now().UnixNano())),Value: []byte(msg),}); err != nil {fmt.Printf("Failed to write message: %v\n", err)} else {fmt.Printf("Message written: %s\n", msg)}}// 关闭写入器if err := writer.Close(); err != nil {fmt.Printf("Failed to close writer: %v\n", err)}
}
WriterConfig中有一个字段我们在开发过程中可能会需要关注到,Balancer:这个用于把消息分发到不同的分区上。这个策略可以自定义。当然,kafka-go中提供几种常用的方法,我以示例中的hash这个为例做简要说明,我们直接看源码。生产者发送消息的逻辑主要在这个WriteMessages方法中,我们直接定位到消息分发到对应分区的逻辑。
func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {//......//忽略与分区不相关的代码,我们只关注和分区相关的逻辑balancer := w.balancer()for i, msg := range msgs {topic, err := w.chooseTopic(msg)if err != nil {return err}numPartitions, err := w.partitions(ctx, topic)if err != nil {return err}//根据msg计算把消息分发到哪个分区partition := balancer.Balance(msg, loadCachedPartitions(numPartitions)...)key := topicPartition{topic: topic,partition: int32(partition),}assignments[key] = append(assignments[key], int32(i))}batches := w.batchMessages(msgs, assignments)if w.Async {return nil}.....//忽略一些细节return werr
}
如果设置了分区策略,则以设置的分区策略进行分发消息。
func (w *Writer) balancer() Balancer {if w.Balancer != nil {return w.Balancer}return &w.roundRobin
}
接下来,我们看一下我的示例中使用的Hash的分发消息的策略。我们看到这个方法的主要逻辑,就是根据msg中key进行hash,然后根据topic下总分区数来计算消息分发到对应的分区号。hasher.Write(msg.Key)
func (h *Hash) Balance(msg Message, partitions ...int) int {if msg.Key == nil {return h.rr.Balance(msg, partitions...)}hasher := h.Hasherif hasher != nil {h.lock.Lock()defer h.lock.Unlock()} else {hasher = fnv1aPool.Get().(hash.Hash32)defer fnv1aPool.Put(hasher)}hasher.Reset()if _, err := hasher.Write(msg.Key); err != nil {panic(err)}// uses same algorithm that Sarama's hashPartitioner uses// note the type conversions here. if the uint32 hash code is not cast to// an int32, we do not get the same result as sarama.partition := int32(hasher.Sum32()) % int32(len(partitions))if partition < 0 {partition = -partition}return int(partition)
}
消费者
消费者没有太多的注意事项,只是如果有多个分区,想要提升并发能力,可以启多个消费者。我们直接看示例。
package mainimport ("context""fmt""os""os/signal""syscall""time""github.com/segmentio/kafka-go"
)func main() {// 创建Kafka读取器配置readerConfig := kafka.ReaderConfig{Brokers: []string{"10.10.37.100:30001"},Topic: "my-topic",GroupID: "my-group",MinBytes: 10e3, // 10KBMaxBytes: 10e6, // 10MB}// 创建读取器reader := kafka.NewReader(readerConfig)// 处理信号以便优雅地关闭sigChan := make(chan os.Signal, 1)signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)// 读取消息for {select {case sig := <-sigChan:fmt.Printf("Caught signal %v: terminating\n", sig)returndefault:msg, err := reader.ReadMessage(context.Background())if err != nil {fmt.Printf("Failed to read message: %v\n", err)continue}fmt.Printf("当前时间:%v Message on topic: %s value: %s partion:%d\n", time.Now(), msg.Topic, string(msg.Value), msg.Partition)}}
}
创建topic
直接上示例。创建topic是个幂等操作。
package mainimport ("net""strconv""github.com/segmentio/kafka-go"
)func main() {// to create topics when auto.create.topics.enable='false'topic := "my-topic"conn, err := kafka.Dial("tcp", "10.10.37.100:30001")if err != nil {panic(err.Error())}defer conn.Close()controller, err := conn.Controller()if err != nil {panic(err.Error())}var controllerConn *kafka.ConncontrollerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))if err != nil {panic(err.Error())}defer controllerConn.Close()topicConfigs := []kafka.TopicConfig{{Topic: topic,NumPartitions: 10,ReplicationFactor: 1,},}err = controllerConn.CreateTopics(topicConfigs...)if err != nil {panic(err.Error())}
}
相关文章:
kafka-go使用:以及kafka一些基本概念说明
关于kafka 作为开发人员kafka中最常关注的几个概念,是topic,partition和group这几个概念。topic是主题的意思,简单的说topic是数据主题,这样解释好像显得很苍白,只是做了个翻译。一图胜前言,我们还是通过图解来说明。…...
景联文科技:破解数据标注行业痛点,引领高质量AI数据服务
数据标注行业是人工智能和机器学习领域中一个非常重要的组成部分。随着AI技术的发展,对高质量标注数据的需求也在不断增长。 数据标注市场的痛点 1. 团队管理 在众包和转包模式下,管理大量的标注人员是一项挑战。 需要确保标注人员的专业性、稳定性和…...
C#获取Network的相关信息
1,获取网络的通断。 //方法1:无效果,并不能反映当前网络通断 bool availableSystem.Windows.Forms.SystemInformation.Network//方法2:通过VB获取网络状态,可反映当前网络通断 Microsoft.VisualBasic.Devices.Network…...
Jenkins 部署Vue项目指引: Vue项目本地跨域代理 、解决ERR_UNSAFE_PORT
文章目录 引言I Jenkins 部署Vue项目配置插件安装系统配置NodeJS安装目录和别名设置新建任务(通用类型)构建环境Build Steps(构建步骤)II nginx部署站点(端口和站点目录的映射)查找Nginx配置文件端口和站点目录的映射III Vue项目本地跨域代理,屏蔽掉后端服务API的网关IP…...
C语言电子画板
目录 开头程序程序的流程图程序的效果结尾 开头 大家好,我叫这是我58。今天,我们来看一下我用C语言编译的电子画板和与之相关的一些东西。 程序 #define _CRT_SECURE_NO_WARNINGS 1 #include <stdio.h> #include <Windows.h> int main() …...
Android Gradle开发与应用技术原理
Android Gradle开发与应用技术原理 Android Gradle开发与应用技术原理一、概述二、Gradle构建原理1. Gradle架构2. Gradle构建过程3. 构建脚本 三、Gradle插件机制四、在Android应用中实现Text-to-Speech(TTS)功能1. 配置Gradle依赖2. 实现TTS功能示例代…...
Midjourney入门-提示词基础撰写与公式
前言 在前几篇教程里我们已经可以初步使用Midjourney进行出图了。 包括也了解了Midjourney的指令与参数。 但如果你想用Midjourney去生成各种各样高质量的图片, 并且生成的图片是你想要的画面内容,也就是更好控制生成图片的画面内容与风格…...
Apache Tomcat服务器版本号隐藏
渗透测试时发现有一台服务器的404报错页面中,有Apache Tomcat的版本号信息显示,发生了信息泄露,可能导致服务器被攻击。如下所示: 解决步骤如下: 1. 隐藏HTTP响应头中的Server信息 Tomcat默认会在HTTP响应头中包含S…...
【Qt】Qt编程注意事项
目录 Qr中的命名规范 Qt Creator中的快捷键 查询文档的方式 Qt窗口坐标体系 Qr中的命名规范 在学习编程语言阶段,给变量、函数、文件、类命名是非常有讲究的。 命名要有描述性,不要使用abc,xyz这种比较无规律的名字类描述。如果名字比较…...
在Linux系统安装Kafka
注意:我的是在云服务器上基于Docker配 在防火墙上放行端口号 2181(Zookeeper) 9092(Kafka) 一、先配置 Docker 守护进程(daemon)的镜像加速器(registry mirrors) sudo mkdir -p /etc/docker sudo tee /etc/docker/da…...
【CSharp】简单定义一个异步方法
【CSharp】定义一个异步方法 1.背景2.异步方法3.代码说明1.背景 相关博客: 【CSharp】使用异步事件处理程序和委托来进行异步调用 https://blog.csdn.net/jn10010537/article/details/140898179在 C# 中,异步方法和同步方法是两种执行代码的方式, 它们主要区别在于处理任务…...
贪心算法之货仓选址问题
#include<stdio.h> #include<stdlib.h> #include<math.h>//贪心算法之货仓选址问题/*** void* p是万能指针,可以和其它任意类型的指针进行转换,前提是确保转换是合法的*/ //写好用于qsort的比较函数,这里写的函数一般用于…...
Java网络编程——Request Response 对象
Response - 网页 上一章我们学习了 Java 中使用 Okhttp3 库请求网页或调用 API 的知识。 使用一条语句执行调用请求,并取得返回结果字符串: call.execute().body().string()execute() 方法是真正执行发送请求,前面的一系列代码是做前置准备…...
【代码随想录训练营第42期 Day24打卡 回溯Part3 - LeetCode 93.复原IP地址 78.子集 90.子集II
目录 一、做题心得 二、题目与题解 题目一:93.复原IP地址 题目链接 题解:回溯--分割问题 题目二:78.子集 题目链接 题解:回溯--子集问题 题目三:90.子集II 题目链接 题解:回溯--子集问题 三、小…...
python venv和virtualenv详解
一、venv简介 C:\Users\love1>python -m venv -h usage: venv [-h] [--system-site-packages] [--symlinks | --copies] [--clear] [--upgrade] [--without-pip][--prompt PROMPT] [--upgrade-deps]ENV_DIR [ENV_DIR ...]该命令用于在一个目录或者多个目录中创建一个虚拟的…...
《征服数据结构》树堆(Treap)
摘要: 1,Treap的介绍 2,Treap节点的插入 3,Treap节点的删除 4,Treap和笛卡尔树的区别 1,Treap的介绍 Treap又叫树堆,属于一种自平衡二叉搜索树,是由单词Tree和Heap构成,是…...
论文笔记:OneBit: Towards Extremely Low-bit Large Language Models
202402 arxiv 1 背景 模型量化主要通过把模型的线性层【nn.Linear】(Embedding 层和 Lm_head 层除外)转化为低精度表示实现空间压缩 此前工作的基础是利用 Round-To-Nearest(RTN)方法把高精度浮点数近似映射到附近的整数网格然而…...
英语文化中的音乐分类及其发展历史(Classical、Jazz、Rock、Pop、Electronic、Country、RB、Hip-Hop)
文章目录 英语文化中的音乐分类及其发展历史1. 简介2. 古典音乐 (Classical Music)2.1 起源与发展2.2 技术与风格 3. 爵士音乐 (Jazz Music)3.1 起源与发展3.2 技术与风格 4. 摇滚音乐 (Rock Music)(Rock and roll)4.1 起源与发展4.2 技术与风格 5. 蓝调…...
C语言-栈、队列、二叉树
12 栈、队列、二叉树 目录 12 栈、队列、二叉树 一、栈、队列、二叉树是什么? 二、栈 1. 特点:先进后出 -- 有底的盒子 2. 使用场景:函数调用 -- 中断机制 3. 实现栈的形式: 三、队列 1. 特点:先进先出 -- 水…...
pinia-plugin-persistedstate 插件不生效
引入使用该插件使用时发现不生效 原因:pinia实例调用顺序不当 将: // import ./assets/main.css import { createApp } from vue import { createPinia } from pinia import piniaPluginPersistedstate from pinia-plugin-persistedstate import App fr…...
TDengine 快速体验(Docker 镜像方式)
简介 TDengine 可以通过安装包、Docker 镜像 及云服务快速体验 TDengine 的功能,本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker,请使用 安装包的方式快…...
微软PowerBI考试 PL300-选择 Power BI 模型框架【附练习数据】
微软PowerBI考试 PL300-选择 Power BI 模型框架 20 多年来,Microsoft 持续对企业商业智能 (BI) 进行大量投资。 Azure Analysis Services (AAS) 和 SQL Server Analysis Services (SSAS) 基于无数企业使用的成熟的 BI 数据建模技术。 同样的技术也是 Power BI 数据…...
Java 8 Stream API 入门到实践详解
一、告别 for 循环! 传统痛点: Java 8 之前,集合操作离不开冗长的 for 循环和匿名类。例如,过滤列表中的偶数: List<Integer> list Arrays.asList(1, 2, 3, 4, 5); List<Integer> evens new ArrayList…...
MFC内存泄露
1、泄露代码示例 void X::SetApplicationBtn() {CMFCRibbonApplicationButton* pBtn GetApplicationButton();// 获取 Ribbon Bar 指针// 创建自定义按钮CCustomRibbonAppButton* pCustomButton new CCustomRibbonAppButton();pCustomButton->SetImage(IDB_BITMAP_Jdp26)…...
【网络安全产品大调研系列】2. 体验漏洞扫描
前言 2023 年漏洞扫描服务市场规模预计为 3.06(十亿美元)。漏洞扫描服务市场行业预计将从 2024 年的 3.48(十亿美元)增长到 2032 年的 9.54(十亿美元)。预测期内漏洞扫描服务市场 CAGR(增长率&…...
论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(一)
宇树机器人多姿态起立控制强化学习框架论文解析 论文解读:交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(一) 论文解读:交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化…...
ardupilot 开发环境eclipse 中import 缺少C++
目录 文章目录 目录摘要1.修复过程摘要 本节主要解决ardupilot 开发环境eclipse 中import 缺少C++,无法导入ardupilot代码,会引起查看不方便的问题。如下图所示 1.修复过程 0.安装ubuntu 软件中自带的eclipse 1.打开eclipse—Help—install new software 2.在 Work with中…...
【Go语言基础【12】】指针:声明、取地址、解引用
文章目录 零、概述:指针 vs. 引用(类比其他语言)一、指针基础概念二、指针声明与初始化三、指针操作符1. &:取地址(拿到内存地址)2. *:解引用(拿到值) 四、空指针&am…...
深度学习水论文:mamba+图像增强
🧀当前视觉领域对高效长序列建模需求激增,对Mamba图像增强这方向的研究自然也逐渐火热。原因在于其高效长程建模,以及动态计算优势,在图像质量提升和细节恢复方面有难以替代的作用。 🧀因此短时间内,就有不…...
【JVM】Java虚拟机(二)——垃圾回收
目录 一、如何判断对象可以回收 (一)引用计数法 (二)可达性分析算法 二、垃圾回收算法 (一)标记清除 (二)标记整理 (三)复制 (四ÿ…...
