当前位置: 首页 > news >正文

【Kafka-go】golang的kafka应用

网络上关于go的Kafka还是比较少的今天就先出一篇入门级别的,之后再看看能能出一个公司业务场景中的消息流。

一、下载github.com/segmentio/kafka-go包

go get github.com/segmentio/kafka-go

二、建立kafka连接

正常来说下面的配置host topic partition 应该写在配置文件里
*/
const host = "localhost:9092"    //host 具体看你们自己的配置如果是服务器上的 就是服务器iP:9092 本地就是localhost:9092
const topic = "my"
const partition = 0/*
NewKafKaCon kafka的客户端连接的初始化方法
*/
func NewKafKaConn() (*kafka.Conn, error) {return kafka.DialLeader(context.Background(), "tcp", host, topic, partition)
}

三、kafka之发送消息(生产者)

/*
People消息的格式 标准情况下应该写在model层的结构体
*/
type People struct {Name string `json:"name"`Pwd  string `json:"pwd"`
}// writeByConn 基于Conn发送消息
func writeByConn() {// 连接至Kafka集群的Leader节点conn, err := NewKafKaCon()if err != nil {log.Fatal("failed to dial leader:", err)}// 设置发送消息的超时时间conn.SetWriteDeadline(time.Now().Add(10 * time.Second))people1 := People{"Tmo","124"}people2 := People{"Mac","124"}people3 := People{"Joker","124"}// 发送消息str1, _ := json.Marshal(people1)str2, _ := json.Marshal(people2)str3, _ := json.Marshal(people3)_, err = conn.WriteMessages(kafka.Message{Value: []byte(str1)},kafka.Message{Value: []byte(str2)},kafka.Message{Value: []byte(str3)},)if err != nil {log.Fatal("failed to write messages:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close writer:", err)}
}

四、kafka之接收消息(消费者)

// readByConn 连接至kafka后接收消息
func readByConn() {// 指定要连接的topic和partition// 连接至Kafka的leader节点conn, err := NewKafKaCon()if err != nil {log.Fatal("failed to dial leader:", err)}// 设置读取超时时间conn.SetReadDeadline(time.Now().Add(10 * time.Second))// 读取一批消息,得到的batch是一系列消息的迭代器batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max// 遍历读取消息b := make([]byte, 10e3) // 10KB max per messagefor {p := People{}n, err := batch.Read(b)if err != nil {break}err = json.Unmarshal(b[:n], &p)if err != nil {fmt.Println(string(b))fmt.Println(err, "**************")continue}fmt.Println(p)}// 关闭batchif err := batch.Close(); err != nil {log.Fatal("failed to close batch:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close connection:", err)}
}

完整代码

package mainimport ("context""encoding/json""fmt""github.com/segmentio/kafka-go""log""time"
)/*
People消息的格式 标准情况下应该写在model层的结构体
*/
type People struct {Name string `json:"name"`Pwd  string `json:"pwd"`
}/*
正常来说下面的配置host topic partition 应该写在配置文件里
*/
const host = "localhost:9091"
const topic = "my"
const partition = 0/*
NewKafKaCon kafka的客户端连接的初始化方法
*/
func NewKafKaCon() (*kafka.Conn, error) {return kafka.DialLeader(context.Background(), "tcp", host, topic, partition)
}func main() {writeByConn()readByConn()}// writeByConn 基于Conn发送消息
func writeByConn() {// 连接至Kafka集群的Leader节点conn, err := NewKafKaCon()if err != nil {log.Fatal("failed to dial leader:", err)}// 设置发送消息的超时时间conn.SetWriteDeadline(time.Now().Add(10 * time.Second))people1 := People{"Tmo","124"}people2 := People{"Mac","124"}people3 := People{"Joker","124"}// 发送消息str1, _ := json.Marshal(people1)str2, _ := json.Marshal(people2)str3, _ := json.Marshal(people3)_, err = conn.WriteMessages(kafka.Message{Value: []byte(str1)},kafka.Message{Value: []byte(str2)},kafka.Message{Value: []byte(str3)},)if err != nil {log.Fatal("failed to write messages:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close writer:", err)}
}// readByConn 连接至kafka后接收消息
func readByConn() {// 指定要连接的topic和partition// 连接至Kafka的leader节点conn, err := NewKafKaCon()if err != nil {log.Fatal("failed to dial leader:", err)}// 设置读取超时时间conn.SetReadDeadline(time.Now().Add(10 * time.Second))// 读取一批消息,得到的batch是一系列消息的迭代器batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max// 遍历读取消息b := make([]byte, 10e3) // 10KB max per messagefor {p := People{}n, err := batch.Read(b)if err != nil {break}err = json.Unmarshal(b[:n], &p)if err != nil {fmt.Println(string(b))fmt.Println(err, "**************")continue}fmt.Println(p)}// 关闭batchif err := batch.Close(); err != nil {log.Fatal("failed to close batch:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close connection:", err)}
}

五、kafka之消费者组实现消息确认(从一次消费消息的末尾开始接收消息)

只需要给读取消息的方法改变一下就可以了


func readByConn() {r := kafka.NewReader(kafka.ReaderConfig{Brokers:  []string{host},GroupID:  "consumer-group-id",Topic:    topic,MaxBytes: 10e6, // 10MB})for {m, err := r.ReadMessage(context.Background())if err != nil {break}fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))}
}

完整代码

package mainimport ("context""encoding/json""fmt""github.com/segmentio/kafka-go""log""time"
)/*
People消息的格式 标准情况下应该写在model层的结构体
*/
type People struct {Name string `json:"name"`Pwd  string `json:"pwd"`
}/*
正常来说下面的配置host topic partition 应该写在配置文件里
*/
const host = "localhost:9091"
const topic = "my"
const partition = 0/*
NewKafKaCon kafka的客户端连接的初始化方法
*/
func NewKafKaCon() (*kafka.Conn, error) {return kafka.DialLeader(context.Background(), "tcp", host, topic, partition)
}func main() {writeByConn()readByConn()
}// writeByConn 基于Conn发送消息
func writeByConn() {// 连接至Kafka集群的Leader节点conn, err := NewKafKaCon()if err != nil {log.Fatal("failed to dial leader:", err)}// 设置发送消息的超时时间conn.SetWriteDeadline(time.Now().Add(10 * time.Second))people1 := People{"Tmo","124"}people2 := People{"Mac","124"}people3 := People{"Joker","124"}// 发送消息str1, _ := json.Marshal(people1)str2, _ := json.Marshal(people2)str3, _ := json.Marshal(people3)_, err = conn.WriteMessages(kafka.Message{Value: []byte(str1)},kafka.Message{Value: []byte(str2)},kafka.Message{Value: []byte(str3)},)if err != nil {log.Fatal("failed to write messages:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close writer:", err)}
}func readByConn() {r := kafka.NewReader(kafka.ReaderConfig{Brokers:  []string{host},GroupID:  "consumer-group-id",Topic:    topic,MaxBytes: 10e6, // 10MB})for {m, err := r.ReadMessage(context.Background())if err != nil {break}fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))}
}

相关文章:

【Kafka-go】golang的kafka应用

网络上关于go的Kafka还是比较少的今天就先出一篇入门级别的,之后再看看能能出一个公司业务场景中的消息流。 一、下载github.com/segmentio/kafka-go包 go get github.com/segmentio/kafka-go二、建立kafka连接 正常来说下面的配置host topic partition 应该写在…...

redis:set集合命令,内部编码,使用场景

个人主页 : 个人主页 个人专栏 : 《数据结构》 《C语言》《C》《Linux》《网络》 《redis学习笔记》 文章目录 前言命令SADDSMEMBERSSISMEMBERSCARDSPOPSMOVESREM集合间操作SINTERSINTERSTORESUNIONSUNIONSTORESDIFFSDIFFSTORE 内部编码使用场景总结 前言…...

45期代码随想录算法营总结

代码随想录训练营总结与收获 在为期60天的代码随想录训练营结束后,我感慨良多。这段时间不仅让我在编程技能上有了明显的提升,更让我在学习习惯和时间管理上有了深刻的反思和改变。 报名参加这个训练营对我来说是一个重要的监督机制。之前我总是拖延&a…...

深入理解Java中的instanceof关键字及接口新特性:方法实现的可能性

目录 引言 1. 什么是instanceof关键字? 1.1 语法结构 1.2 instanceof的用法示例 1.3 instanceof的应用场景 2. Java中的接口能包含方法实现吗? 2.1 默认方法(Default Method) 2.2 静态方法(Static Method&…...

【python中如果class没有self会怎行】

python中如果class没有self会怎样TOC 在Python中,self是一个约定俗成的名称,用于表示类的实例。如果没有使用self,会导致以下问题: 1、无法访问实例属性: 在类的方法中,如果没有self,方法将无…...

【算法】(Python)动态规划

动态规划: dynamic programming。"programming"指的是一种表格法,而非编写计算机程序。通常解决最优化问题(optimization problem)。将问题拆分成若干个子问题,求解各子问题来得到原问题的解。适用于多阶段…...

EasyExcel 学习之 导出 “提示问题”

EasyExcel 学习之 导出 “提示问题” 现象分析解决(伪代码)前端 POST 实现后端实现 现象 EasyExcel 支持导出 xlsx、xls、csv 三种文件格式。在导出过程中可能发生各种异常,当发生异常时应该提示错误信息而非导出一个错误的文件。 分析 首…...

应用系统开发(3)低功耗四运算放大器LM324N

LM324N 是一种广泛使用的 低功耗四运算放大器,由德州仪器(Texas Instruments)和其他制造商生产。它具有四个独立的运算放大器,能够在单电源或双电源模式下运行,适合多种模拟电路应用。以下是详细信息: 芯片基本信息 型号:LM324N封装类型:常见 DIP(双列直插封装)或 SO…...

基于微信小程序的电商平台+LW示例参考

1.项目介绍 系统角色:管理员、普通用户功能模块:管理员(用户管理、商品分类、商品管理、订单管理、系统管理等),普通用户(个人中心、收藏、我的订单、查看商品等)技术选型:SpringBo…...

[Android] Graphic Buffer 的申请

前言: MediaCodec 支持 texture mode,即MediaCodec解码video完毕后把 yuv 数据填入 GPU 共享出来的 graphic buffer 里面,app 会把 video 的 yuv数据 和 ui 的数据通过通过软件渲染组件(opengl等)发送给GPU 进行一并渲染。这样做的效率较低&…...

【大数据学习 | HBASE高级】storeFile文件的合并

Compaction 操作分成下面两种: Minor Compaction:是选取一些小的、相邻的StoreFile将他们合并成一个更大的StoreFile,对于删除、过期、多余版本的数据不进行清除。 Major Compaction:是指将所有的StoreFile合并成一个StoreFile&am…...

多平台编包动态引入依赖的解决方案

最近开发时遇到了这样的需求,A 平台需要引入一个 video.js,B 平台却是不需要的,那么面向 B 平台打包的时候把依赖装进去自然就不大合适。最好的方法是动态引入依赖,根据平台来判断要不要引入 动态引入依赖 很快啊,动…...

[单例模式]

目录 [设计模式] 单例模式 1. 饿汉模式 2. 懒汉模式 3. 单例模式的线程安全问题 [设计模式] 设计模式是软件工程中的一种常见做法, 它可以理解为"模板", 是针对一些常见的特定场景, 给出的一些比较好的固定的解决方案. 不同语言适用的设计模式是不一样的. 这里…...

速盾:游戏盾的功能和原理详解

速盾有一款专注于网络游戏安全的防护系统,它通过实时监测游戏网络流量和玩家行为,以及使用先进的算法和技术进行分析和识别,检测出各种外挂、作弊行为和恶意攻击,从而保障游戏的公平性和玩家的安全性。 速盾游戏盾的主要功能包括…...

Spleeter:音频分离的革命性工具

目录 什么是Spleeter?Spleeter的工作原理Spleeter的应用场景Spleeter的技术优势Spleeter的挑战与局限性结论 什么是Spleeter? Spleeter 是一个由 Deezer 开发的开源音频源分离工具。它基于深度学习技术,尤其是卷积神经网络(CNN&a…...

【笔记】自动驾驶预测与决策规划_Part6_不确定性感知的决策过程

文章目录 0. 前言1. 部分观测的马尔可夫决策过程1.1 POMDP的思想以及与MDP的联系1.1.1 MDP的过程回顾1.1.2 POMDP定义1.1.3 与MDP的联系及区别POMDP 视角MDP 视角决策次数对最优解的影响 1.2 POMDP的3种常规解法1.2.1 连续状态的“Belief MDP”方法1. 信念状态的定义2. Belief …...

openresty入门教程:access_by_lua_block

在OpenResty中,access_by_lua_block 是一个功能强大的指令,它允许你在Nginx的访问控制阶段执行Lua脚本。这个阶段发生在Nginx处理请求的过程中,紧接在rewrite阶段之后,但在请求被传递到后端服务器(如PHP、Node.js等&am…...

Caused by: org.apache.flink.api.common.io.ParseException: Row too short:

Flink版本 1.17.2 错误描述 Caused by: org.apache.flink.api.common.io.ParseException: Row too short: 通过flink中的flinkSql直接使用对应的connector去获取csv文件内容,报获取的数据太短了 可能原因 1.创建的表字段多于csv文件当中的表头 定位 在获取csv…...

hbase的安装与简单操作

好的,这里是关于 HBase 的安装和基本操作的详细步骤,分成几个更清晰的阶段: 第一部分:安装和配置 HBase 1. 环境准备 HBase 依赖于 Hadoop,因此首先确保 Hadoop 已经正确安装和配置。如果没有安装,请先下…...

PySpark本地开发环境搭建

一.前置事项 请注意,需要先实现Windows的本地JDK和Hadoop的安装。 二.windows安装Anaconda 资源:Miniconda3-py38-4.11.0-Windows-x86-64,在window使用的Anaconda资源-CSDN文库 右键以管理员身份运行,选择你的安装路径&#x…...

shell脚本--常见案例

1、自动备份文件或目录 2、批量重命名文件 3、查找并删除指定名称的文件: 4、批量删除文件 5、查找并替换文件内容 6、批量创建文件 7、创建文件夹并移动文件 8、在文件夹中查找文件...

【JVM】- 内存结构

引言 JVM:Java Virtual Machine 定义:Java虚拟机,Java二进制字节码的运行环境好处: 一次编写,到处运行自动内存管理,垃圾回收的功能数组下标越界检查(会抛异常,不会覆盖到其他代码…...

【大模型RAG】Docker 一键部署 Milvus 完整攻略

本文概要 Milvus 2.5 Stand-alone 版可通过 Docker 在几分钟内完成安装;只需暴露 19530(gRPC)与 9091(HTTP/WebUI)两个端口,即可让本地电脑通过 PyMilvus 或浏览器访问远程 Linux 服务器上的 Milvus。下面…...

Java多线程实现之Callable接口深度解析

Java多线程实现之Callable接口深度解析 一、Callable接口概述1.1 接口定义1.2 与Runnable接口的对比1.3 Future接口与FutureTask类 二、Callable接口的基本使用方法2.1 传统方式实现Callable接口2.2 使用Lambda表达式简化Callable实现2.3 使用FutureTask类执行Callable任务 三、…...

Keil 中设置 STM32 Flash 和 RAM 地址详解

文章目录 Keil 中设置 STM32 Flash 和 RAM 地址详解一、Flash 和 RAM 配置界面(Target 选项卡)1. IROM1(用于配置 Flash)2. IRAM1(用于配置 RAM)二、链接器设置界面(Linker 选项卡)1. 勾选“Use Memory Layout from Target Dialog”2. 查看链接器参数(如果没有勾选上面…...

python爬虫:Newspaper3k 的详细使用(好用的新闻网站文章抓取和解析的Python库)

更多内容请见: 爬虫和逆向教程-专栏介绍和目录 文章目录 一、Newspaper3k 概述1.1 Newspaper3k 介绍1.2 主要功能1.3 典型应用场景1.4 安装二、基本用法2.2 提取单篇文章的内容2.2 处理多篇文档三、高级选项3.1 自定义配置3.2 分析文章情感四、实战案例4.1 构建新闻摘要聚合器…...

AirSim/Cosys-AirSim 游戏开发(四)外部固定位置监控相机

这个博客介绍了如何通过 settings.json 文件添加一个无人机外的 固定位置监控相机,因为在使用过程中发现 Airsim 对外部监控相机的描述模糊,而 Cosys-Airsim 在官方文档中没有提供外部监控相机设置,最后在源码示例中找到了,所以感…...

MFC 抛体运动模拟:常见问题解决与界面美化

在 MFC 中开发抛体运动模拟程序时,我们常遇到 轨迹残留、无效刷新、视觉单调、物理逻辑瑕疵 等问题。本文将针对这些痛点,详细解析原因并提供解决方案,同时兼顾界面美化,让模拟效果更专业、更高效。 问题一:历史轨迹与小球残影残留 现象 小球运动后,历史位置的 “残影”…...

并发编程 - go版

1.并发编程基础概念 进程和线程 A. 进程是程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。B. 线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位。C.一个进程可以创建和撤销多个线程;同一个进程中…...

Web中间件--tomcat学习

Web中间件–tomcat Java虚拟机详解 什么是JAVA虚拟机 Java虚拟机是一个抽象的计算机,它可以执行Java字节码。Java虚拟机是Java平台的一部分,Java平台由Java语言、Java API和Java虚拟机组成。Java虚拟机的主要作用是将Java字节码转换为机器代码&#x…...