当前位置: 首页 > news >正文

kafka-go使用:以及kafka一些基本概念说明

关于kafka

作为开发人员kafka中最常关注的几个概念,是topic,partition和group这几个概念。topic是主题的意思,简单的说topic是数据主题,这样解释好像显得很苍白,只是做了个翻译。一图胜前言,我们还是通过图解来说明。

生产者负责写数据,一个topic可以有多个分区。如下图所示,生产者写数据的示意。从这个图中,我们可以得出一个很重要的信息:分区有序,即消息在某个分区上是有顺序的,而全局是没有顺序的。这个意味着如果我们需要保证顺序,我们在写消息时需要往同一个分区中写数据。比如,我们有一个场景,有一个订单。首先,创建支付订单,发送一个kafka消息,然后,实际支付,发送一个kafka消息,最后,又想退款,又发起了退款,又发送了一个kafka消息。如果,这三个消息在不同的分区上,我们就无法保证,我们按照创建支付订单——支付——退款这个顺序执行。依据分区有序的特点,我们可以把跟这个订单相关的所有操作的消息都写到一个分区上,比如,可以通过根据订单id进行hash。

group只跟消费者有关系,消费者通过group进行标识,一个消费者实例表示一个消费者,消费者实例可以在不同的线程中开启,也可以在不同的进程中开启。这可以提升消费的并发能力。那么,这是否意味着可以无限开启消费者实例,以提升消费者消费消息的速度呢?

这就涉及到消费的逻辑,我先给出结论。接下来,我们还是通过示意图的方式来做补充说明。group跟partition的数量有关系,当消费者数量小于等于partition数量时,每个消费者都能消费到消息。

如果一个topic有4个分区,分别为p0,p1,p2,p3。现在有两个消费者组,分别是groupA和groupB。一个消费者就是一个消费实例,比如,C0就是在一个进程中启的一个消费者实例。因为这个topic有4个分区,groupA有4个消费者,则每个消费者被分配到一个对应的分区上消费,假如,这个分组又启了一个消费者consumer1,即消费者数量大于分区数量,则这个消费者不会读到消息。

而groupB只有两个消费者,则每一个消费者分别获取两个分区上的消息,如果,groupB只有一个消费者,那么,所有分区上的消息都只有这一个消费者获取。

开源项目kafka-go的使用

kafka-go是一个用go语言开发的kafka客户端。

生产者

我们看一个基于kafka-go实现的生产者的示例:

package mainimport ("context""fmt""log""time""github.com/segmentio/kafka-go"
)func main() {// 创建Kafka写入器配置writerConfig := kafka.WriterConfig{Brokers:  []string{"10.10.37.100:30001"},Topic:    "my-topic",Balancer: &kafka.Hash{}, //消息分区的策略,这个策略是通过hash算法根据kafka.Message的key值来选择分区的}// 创建写入器writer := kafka.NewWriter(writerConfig)// 发送消息messages := []string{"message 1", "message 2", "message 3", "message 4", "message 5", "message 6", "message 7", "message 8", "message 9", "message 10", "message 11", "message 12", "message 13", "message 14", "message 15", "message 16"}for _, msg := range messages {time.Sleep(1 * time.Second)if err := writer.WriteMessages(context.Background(), kafka.Message{Key:   []byte(fmt.Sprintf("key-%d", time.Now().UnixNano())),Value: []byte(msg),}); err != nil {fmt.Printf("Failed to write message: %v\n", err)} else {fmt.Printf("Message written: %s\n", msg)}}// 关闭写入器if err := writer.Close(); err != nil {fmt.Printf("Failed to close writer: %v\n", err)}
}

WriterConfig中有一个字段我们在开发过程中可能会需要关注到,Balancer:这个用于把消息分发到不同的分区上。这个策略可以自定义。当然,kafka-go中提供几种常用的方法,我以示例中的hash这个为例做简要说明,我们直接看源码。生产者发送消息的逻辑主要在这个WriteMessages方法中,我们直接定位到消息分发到对应分区的逻辑。

func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {//......//忽略与分区不相关的代码,我们只关注和分区相关的逻辑balancer := w.balancer()for i, msg := range msgs {topic, err := w.chooseTopic(msg)if err != nil {return err}numPartitions, err := w.partitions(ctx, topic)if err != nil {return err}//根据msg计算把消息分发到哪个分区partition := balancer.Balance(msg, loadCachedPartitions(numPartitions)...)key := topicPartition{topic:     topic,partition: int32(partition),}assignments[key] = append(assignments[key], int32(i))}batches := w.batchMessages(msgs, assignments)if w.Async {return nil}.....//忽略一些细节return werr
}

如果设置了分区策略,则以设置的分区策略进行分发消息。 

func (w *Writer) balancer() Balancer {if w.Balancer != nil {return w.Balancer}return &w.roundRobin
}

接下来,我们看一下我的示例中使用的Hash的分发消息的策略。我们看到这个方法的主要逻辑,就是根据msg中key进行hash,然后根据topic下总分区数来计算消息分发到对应的分区号。hasher.Write(msg.Key)

func (h *Hash) Balance(msg Message, partitions ...int) int {if msg.Key == nil {return h.rr.Balance(msg, partitions...)}hasher := h.Hasherif hasher != nil {h.lock.Lock()defer h.lock.Unlock()} else {hasher = fnv1aPool.Get().(hash.Hash32)defer fnv1aPool.Put(hasher)}hasher.Reset()if _, err := hasher.Write(msg.Key); err != nil {panic(err)}// uses same algorithm that Sarama's hashPartitioner uses// note the type conversions here.  if the uint32 hash code is not cast to// an int32, we do not get the same result as sarama.partition := int32(hasher.Sum32()) % int32(len(partitions))if partition < 0 {partition = -partition}return int(partition)
}

消费者

消费者没有太多的注意事项,只是如果有多个分区,想要提升并发能力,可以启多个消费者。我们直接看示例。

package mainimport ("context""fmt""os""os/signal""syscall""time""github.com/segmentio/kafka-go"
)func main() {// 创建Kafka读取器配置readerConfig := kafka.ReaderConfig{Brokers:  []string{"10.10.37.100:30001"},Topic:    "my-topic",GroupID:  "my-group",MinBytes: 10e3, // 10KBMaxBytes: 10e6, // 10MB}// 创建读取器reader := kafka.NewReader(readerConfig)// 处理信号以便优雅地关闭sigChan := make(chan os.Signal, 1)signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)// 读取消息for {select {case sig := <-sigChan:fmt.Printf("Caught signal %v: terminating\n", sig)returndefault:msg, err := reader.ReadMessage(context.Background())if err != nil {fmt.Printf("Failed to read message: %v\n", err)continue}fmt.Printf("当前时间:%v Message on topic: %s value: %s partion:%d\n", time.Now(), msg.Topic, string(msg.Value), msg.Partition)}}
}

创建topic

直接上示例。创建topic是个幂等操作。

package mainimport ("net""strconv""github.com/segmentio/kafka-go"
)func main() {// to create topics when auto.create.topics.enable='false'topic := "my-topic"conn, err := kafka.Dial("tcp", "10.10.37.100:30001")if err != nil {panic(err.Error())}defer conn.Close()controller, err := conn.Controller()if err != nil {panic(err.Error())}var controllerConn *kafka.ConncontrollerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))if err != nil {panic(err.Error())}defer controllerConn.Close()topicConfigs := []kafka.TopicConfig{{Topic:             topic,NumPartitions:     10,ReplicationFactor: 1,},}err = controllerConn.CreateTopics(topicConfigs...)if err != nil {panic(err.Error())}
}

相关文章:

kafka-go使用:以及kafka一些基本概念说明

关于kafka 作为开发人员kafka中最常关注的几个概念&#xff0c;是topic,partition和group这几个概念。topic是主题的意思&#xff0c;简单的说topic是数据主题&#xff0c;这样解释好像显得很苍白&#xff0c;只是做了个翻译。一图胜前言&#xff0c;我们还是通过图解来说明。…...

景联文科技:破解数据标注行业痛点,引领高质量AI数据服务

数据标注行业是人工智能和机器学习领域中一个非常重要的组成部分。随着AI技术的发展&#xff0c;对高质量标注数据的需求也在不断增长。 数据标注市场的痛点 1. 团队管理 在众包和转包模式下&#xff0c;管理大量的标注人员是一项挑战。 需要确保标注人员的专业性、稳定性和…...

C#获取Network的相关信息

1&#xff0c;获取网络的通断。 //方法1&#xff1a;无效果&#xff0c;并不能反映当前网络通断 bool availableSystem.Windows.Forms.SystemInformation.Network//方法2&#xff1a;通过VB获取网络状态&#xff0c;可反映当前网络通断 Microsoft.VisualBasic.Devices.Network…...

Jenkins 部署Vue项目指引: Vue项目本地跨域代理 、解决ERR_UNSAFE_PORT

文章目录 引言I Jenkins 部署Vue项目配置插件安装系统配置NodeJS安装目录和别名设置新建任务(通用类型)构建环境Build Steps(构建步骤)II nginx部署站点(端口和站点目录的映射)查找Nginx配置文件端口和站点目录的映射III Vue项目本地跨域代理,屏蔽掉后端服务API的网关IP…...

C语言电子画板

目录 开头程序程序的流程图程序的效果结尾 开头 大家好&#xff0c;我叫这是我58。今天&#xff0c;我们来看一下我用C语言编译的电子画板和与之相关的一些东西。 程序 #define _CRT_SECURE_NO_WARNINGS 1 #include <stdio.h> #include <Windows.h> int main() …...

Android Gradle开发与应用技术原理

Android Gradle开发与应用技术原理 Android Gradle开发与应用技术原理一、概述二、Gradle构建原理1. Gradle架构2. Gradle构建过程3. 构建脚本 三、Gradle插件机制四、在Android应用中实现Text-to-Speech&#xff08;TTS&#xff09;功能1. 配置Gradle依赖2. 实现TTS功能示例代…...

Midjourney入门-提示词基础撰写与公式

​ 前言 在前几篇教程里我们已经可以初步使用Midjourney进行出图了。 包括也了解了Midjourney的指令与参数。 但如果你想用Midjourney去生成各种各样高质量的图片&#xff0c; 并且生成的图片是你想要的画面内容&#xff0c;也就是更好控制生成图片的画面内容与风格&#xf…...

Apache Tomcat服务器版本号隐藏

渗透测试时发现有一台服务器的404报错页面中&#xff0c;有Apache Tomcat的版本号信息显示&#xff0c;发生了信息泄露&#xff0c;可能导致服务器被攻击。如下所示&#xff1a; 解决步骤如下&#xff1a; 1. 隐藏HTTP响应头中的Server信息 Tomcat默认会在HTTP响应头中包含S…...

【Qt】Qt编程注意事项

目录 Qr中的命名规范 Qt Creator中的快捷键 查询文档的方式 Qt窗口坐标体系 Qr中的命名规范 在学习编程语言阶段&#xff0c;给变量、函数、文件、类命名是非常有讲究的。 命名要有描述性&#xff0c;不要使用abc&#xff0c;xyz这种比较无规律的名字类描述。如果名字比较…...

在Linux系统安装Kafka

注意&#xff1a;我的是在云服务器上基于Docker配 在防火墙上放行端口号 2181(Zookeeper) 9092(Kafka) 一、先配置 Docker 守护进程&#xff08;daemon&#xff09;的镜像加速器&#xff08;registry mirrors&#xff09; sudo mkdir -p /etc/docker sudo tee /etc/docker/da…...

【CSharp】简单定义一个异步方法

【CSharp】定义一个异步方法 1.背景2.异步方法3.代码说明1.背景 相关博客: 【CSharp】使用异步事件处理程序和委托来进行异步调用 https://blog.csdn.net/jn10010537/article/details/140898179在 C# 中,异步方法和同步方法是两种执行代码的方式, 它们主要区别在于处理任务…...

贪心算法之货仓选址问题

#include<stdio.h> #include<stdlib.h> #include<math.h>//贪心算法之货仓选址问题/*** void* p是万能指针&#xff0c;可以和其它任意类型的指针进行转换&#xff0c;前提是确保转换是合法的*/ //写好用于qsort的比较函数&#xff0c;这里写的函数一般用于…...

Java网络编程——Request Response 对象

Response - 网页 上一章我们学习了 Java 中使用 Okhttp3 库请求网页或调用 API 的知识。 使用一条语句执行调用请求&#xff0c;并取得返回结果字符串&#xff1a; call.execute().body().string()execute() 方法是真正执行发送请求&#xff0c;前面的一系列代码是做前置准备…...

【代码随想录训练营第42期 Day24打卡 回溯Part3 - LeetCode 93.复原IP地址 78.子集 90.子集II

目录 一、做题心得 二、题目与题解 题目一&#xff1a;93.复原IP地址 题目链接 题解&#xff1a;回溯--分割问题 题目二&#xff1a;78.子集 题目链接 题解&#xff1a;回溯--子集问题 题目三&#xff1a;90.子集II 题目链接 题解&#xff1a;回溯--子集问题 三、小…...

python venv和virtualenv详解

一、venv简介 C:\Users\love1>python -m venv -h usage: venv [-h] [--system-site-packages] [--symlinks | --copies] [--clear] [--upgrade] [--without-pip][--prompt PROMPT] [--upgrade-deps]ENV_DIR [ENV_DIR ...]该命令用于在一个目录或者多个目录中创建一个虚拟的…...

《征服数据结构》树堆(Treap)

摘要&#xff1a; 1&#xff0c;Treap的介绍 2&#xff0c;Treap节点的插入 3&#xff0c;Treap节点的删除 4&#xff0c;Treap和笛卡尔树的区别 1&#xff0c;Treap的介绍 Treap又叫树堆&#xff0c;属于一种自平衡二叉搜索树&#xff0c;是由单词Tree和Heap构成&#xff0c;是…...

论文笔记:OneBit: Towards Extremely Low-bit Large Language Models

202402 arxiv 1 背景 模型量化主要通过把模型的线性层【nn.Linear】&#xff08;Embedding 层和 Lm_head 层除外&#xff09;转化为低精度表示实现空间压缩 此前工作的基础是利用 Round-To-Nearest&#xff08;RTN&#xff09;方法把高精度浮点数近似映射到附近的整数网格然而…...

英语文化中的音乐分类及其发展历史(Classical、Jazz、Rock、Pop、Electronic、Country、RB、Hip-Hop)

文章目录 英语文化中的音乐分类及其发展历史1. 简介2. 古典音乐 (Classical Music)2.1 起源与发展2.2 技术与风格 3. 爵士音乐 (Jazz Music)3.1 起源与发展3.2 技术与风格 4. 摇滚音乐 (Rock Music)&#xff08;Rock and roll&#xff09;4.1 起源与发展4.2 技术与风格 5. 蓝调…...

C语言-栈、队列、二叉树

12 栈、队列、二叉树 目录 12 栈、队列、二叉树 一、栈、队列、二叉树是什么&#xff1f; 二、栈 1. 特点&#xff1a;先进后出 -- 有底的盒子 2. 使用场景&#xff1a;函数调用 -- 中断机制 3. 实现栈的形式&#xff1a; 三、队列 1. 特点&#xff1a;先进先出 -- 水…...

pinia-plugin-persistedstate 插件不生效

引入使用该插件使用时发现不生效 原因&#xff1a;pinia实例调用顺序不当 将&#xff1a; // import ./assets/main.css import { createApp } from vue import { createPinia } from pinia import piniaPluginPersistedstate from pinia-plugin-persistedstate import App fr…...

内存分配函数malloc kmalloc vmalloc

内存分配函数malloc kmalloc vmalloc malloc实现步骤: 1)请求大小调整:首先,malloc 需要调整用户请求的大小,以适应内部数据结构(例如,可能需要存储额外的元数据)。通常,这包括对齐调整,确保分配的内存地址满足特定硬件要求(如对齐到8字节或16字节边界)。 2)空闲…...

linux之kylin系统nginx的安装

一、nginx的作用 1.可做高性能的web服务器 直接处理静态资源&#xff08;HTML/CSS/图片等&#xff09;&#xff0c;响应速度远超传统服务器类似apache支持高并发连接 2.反向代理服务器 隐藏后端服务器IP地址&#xff0c;提高安全性 3.负载均衡服务器 支持多种策略分发流量…...

智慧医疗能源事业线深度画像分析(上)

引言 医疗行业作为现代社会的关键基础设施,其能源消耗与环境影响正日益受到关注。随着全球"双碳"目标的推进和可持续发展理念的深入,智慧医疗能源事业线应运而生,致力于通过创新技术与管理方案,重构医疗领域的能源使用模式。这一事业线融合了能源管理、可持续发…...

Lombok 的 @Data 注解失效,未生成 getter/setter 方法引发的HTTP 406 错误

HTTP 状态码 406 (Not Acceptable) 和 500 (Internal Server Error) 是两类完全不同的错误&#xff0c;它们的含义、原因和解决方法都有显著区别。以下是详细对比&#xff1a; 1. HTTP 406 (Not Acceptable) 含义&#xff1a; 客户端请求的内容类型与服务器支持的内容类型不匹…...

在rocky linux 9.5上在线安装 docker

前面是指南&#xff0c;后面是日志 sudo dnf config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo sudo dnf install docker-ce docker-ce-cli containerd.io -y docker version sudo systemctl start docker sudo systemctl status docker …...

【JVM】- 内存结构

引言 JVM&#xff1a;Java Virtual Machine 定义&#xff1a;Java虚拟机&#xff0c;Java二进制字节码的运行环境好处&#xff1a; 一次编写&#xff0c;到处运行自动内存管理&#xff0c;垃圾回收的功能数组下标越界检查&#xff08;会抛异常&#xff0c;不会覆盖到其他代码…...

Qt Http Server模块功能及架构

Qt Http Server 是 Qt 6.0 中引入的一个新模块&#xff0c;它提供了一个轻量级的 HTTP 服务器实现&#xff0c;主要用于构建基于 HTTP 的应用程序和服务。 功能介绍&#xff1a; 主要功能 HTTP服务器功能&#xff1a; 支持 HTTP/1.1 协议 简单的请求/响应处理模型 支持 GET…...

WordPress插件:AI多语言写作与智能配图、免费AI模型、SEO文章生成

厌倦手动写WordPress文章&#xff1f;AI自动生成&#xff0c;效率提升10倍&#xff01; 支持多语言、自动配图、定时发布&#xff0c;让内容创作更轻松&#xff01; AI内容生成 → 不想每天写文章&#xff1f;AI一键生成高质量内容&#xff01;多语言支持 → 跨境电商必备&am…...

有限自动机到正规文法转换器v1.0

1 项目简介 这是一个功能强大的有限自动机&#xff08;Finite Automaton, FA&#xff09;到正规文法&#xff08;Regular Grammar&#xff09;转换器&#xff0c;它配备了一个直观且完整的图形用户界面&#xff0c;使用户能够轻松地进行操作和观察。该程序基于编译原理中的经典…...

MySQL 索引底层结构揭秘:B-Tree 与 B+Tree 的区别与应用

文章目录 一、背景知识&#xff1a;什么是 B-Tree 和 BTree&#xff1f; B-Tree&#xff08;平衡多路查找树&#xff09; BTree&#xff08;B-Tree 的变种&#xff09; 二、结构对比&#xff1a;一张图看懂 三、为什么 MySQL InnoDB 选择 BTree&#xff1f; 1. 范围查询更快 2…...