当前位置: 首页 > news >正文

go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费

环境的搭建

Kafka以及相关组件的下载

我们要实现今天的内容,不可避免的要进行对开发环境的配置,Kafka环境的配置比较繁琐,需要配置JDK,Scala,ZoopKeeper和Kafka,这里我们不做赘述,如果大家不知道如何配置环境,这里我们个大家找了一篇博文供大家参考:
Windows下安装Kafka(图文记录详细步骤)

sarama包的安装

今天我们所时机的内容需要用到go语言的第三方包sarama,由于1.19版本后添加了ztcd压缩算法,需要用到cgo,这里我们为了方便考虑选择下载sarama v1.19.0,所以这里我们不能直接使用go get'命令来安装第三方包,我们要使用/go mod文件来实现,下面是主要步骤:

  • 在项目中创建文件夹(博主的是Kafkademo)
  • 打开终端,输入go mod init,进行go.mod文件的初始化:
    在这里插入图片描述
    在这里插入图片描述
  • 我们在.mod文件内指定第三方包及其版本:
module Kafkademorequire (github.com/Shopify/sarama v1.19
)go 1.21.6

其实这是已经可以使用命令go mod tidy了,但是博主在做的时候发现,这样会直接清除掉.mod文件里面的内容,所以建议先创建一个producer文件,在文件里面写:

package mainimport ("fmt""github.com/Shopify/sarama"
)func main() {config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAll                                
}

这时候再打开终端输入go mod tidy
在这里插入图片描述
等待命令运行完毕,打开.mod文件,看到如下内容就OK了:
在这里插入图片描述

利用sarama向Kafka发送消息(消息的生产)

代码

package mainimport ("fmt""github.com/Shopify/sarama"
)func main() {config := sarama.NewConfig()                              //创建config实例config.Producer.RequiredAcks = sarama.WaitForAll          //发送完数据需要leader和follow都确认config.Producer.Partitioner = sarama.NewRandomPartitioner //创建随机分区config.Producer.Return.Successes = true                   //成功交付的消息将在success channel返回//创建信息msg := &sarama.ProducerMessage{}msg.Topic = "web.log"msg.Value = sarama.StringEncoder("this is a test log")//连接KafKaclient, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)if err != nil {fmt.Println("producer closed, err:", err)return}defer client.Close()//发送消息pid, offset, err := client.SendMessage(msg)if err != nil {fmt.Println("send msg failed,err:", err)return}fmt.Printf("pid:%v offset:%v\n", pid, offset)
}

运行过程

  • 首先我们打开终端开起ZooKepper服务
    zkServer
    
    在这里插入图片描述
  • 然后再Kafka所在文件夹下输入命令运行Kafka:
.\bin\windows\kafka-server-start.bat .\config\server.properties

在这里插入图片描述
最后运行程序即可,输出结果为:
在这里插入图片描述

补充:消息的消费

代码

package mainimport ("fmt""github.com/Shopify/sarama""time"
)func main() {customer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)if err != nil {fmt.Println("failed init customer,err:", err)return}partitionlist, err := customer.Partitions("web.log-0") //获取topic的所有分区if err != nil {fmt.Println("failed get partition list,err:", err)return}fmt.Println("partitions:", partitionlist)for partition := range partitionlist { // 遍历所有分区//根据消费者对象创建一个分区对象pc, err := customer.ConsumePartition("web.log", int32(partition), sarama.OffsetNewest)if err != nil {fmt.Println("failed get partition consumer,err:", err)return}defer pc.Close() // 移动到这里go func(consumer sarama.PartitionConsumer) {defer pc.AsyncClose() // 移除这行,因为已经在循环结束时关闭了for msg := range pc.Messages() {fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)}}(pc)time.Sleep(time.Second * 10)}
}

不过这个不能实现直接的消费,后续我们会对这个进行补充,这里仅作介绍。

相关文章:

go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费

环境的搭建 Kafka以及相关组件的下载 我们要实现今天的内容,不可避免的要进行对开发环境的配置,Kafka环境的配置比较繁琐,需要配置JDK,Scala,ZoopKeeper和Kafka,这里我们不做赘述,如果大家不知道如何配置环境&#x…...

Go 单元测试之Mysql数据库集成测试

文章目录 一、 sqlmock介绍二、安装三、基本用法四、一个小案例五、Gorm 初始化注意点 一、 sqlmock介绍 sqlmock 是一个用于测试数据库交互的 Go 模拟库。它可以模拟 SQL 查询、插入、更新等操作,并且可以验证 SQL 语句的执行情况,非常适合用于单元测试…...

Prometheus + Grafana 搭建监控仪表盘

目标要求 1、需要展现的仪表盘: SpringBoot或JVM仪表盘 Centos物理机服务器(实际为物理分割的虚拟服务器)仪表盘 2、展现要求: 探索Prometheus Grafana搭建起来的展示效果,尽可能展示能展示的部分。 一、下载软件包 监控系统核心…...

机器人管理系统的增删查改(Python)

#交互模式 robot ["机器人1","机器人2","机器人3","机器人4"] name input("请输入您的姓名:") print("%s您好欢迎使用机器人管理系统"%(name))while True:print("您可以进行 1.查找 2.修改 3.增…...

【.Net动态Web API】背景与实现原理

🚀前言 本文是《.Net Core进阶编程课程》教程专栏的导航站(点击链接,跳转到专栏主页,欢迎订阅,持续更新…) 专栏介绍:通过源码实例来讲解Asp.Net Core进阶知识点,让大家完全掌握每一…...

JS-43-Node.js02-安装Node.js和npm

Node.js是一个基于Chrome V8引擎的JavaScript运行时环境,可以让JavaScript实现后端开发,所以,首先在本机安装Node.js环境。 一、安装Node.js 官网:下载 Node.js 默认两个版本的下载: 64位windows系统的LTS(Long Tim…...

设计模式(分类)

目录 设计模式(分类) 设计模式(六大原则) 设计模式是软件工程中一种经过验证的、用于解决特定设计问题的通用解决方案。它们是面向对象编程(Object-Oriented Programming, OOP)实践中提炼出的最佳实…...

请陪伴Kimi和GPT成长

经验的闪光汤圆 但是我想要写实的 你有吗? 岁数大了,希望如何学习新知识呢?又觉得自己哪些能力亟需补强呢? 看论文自然得用Kimi,主要是肝不动了,眼睛也顶不住了。 正好昨天跟专业人士学会了用工作流的办法跟…...

优思学院|ISO45001职业健康安全管理体系是什么?

ISO45001:2018是新公布的国际标准规范,全球备受期待的职业健康与安全国际标准(OH&S)于2018年公布,并将在全球范围内改变工作场所实践。ISO45001将取代OHSAS18001,成为全球工作场所健康与安全的参考。 ISO45001:201…...

抖去推短视频矩阵系统----源头开发

为什么一直说让企业去做短视频矩阵?而好处就是有更多的流量入口,不同平台或账号之间可以进行资源互换,最终目的就是获客留咨,提单转化。你去看一些做得大的账号,你会发现他们在许多大的平台上,都有自己的账…...

Golang函数重试机制实现

前言 在编写应用程序时,有时候会遇到一些短暂的错误,例如网络请求、服务链接终端失败等,这些错误可能导致函数执行失败。 但是如果稍后执行可能会成功,那么在一些业务场景下就需要重试了,重试的概念很简单&#xff0c…...

工业电脑在ESOP工作站行业应用

ESOP工作站行业应用 项目背景 E-SOP是实现作业指导书电子化,并统一管理和集中控制的一套管理信息平台。信迈科技的ESOP终端是一款体积小巧功能齐全的高性价比工业电脑,上层通过网络与MES系统连接,下层连接显示器展示作业指导书。ESOP控制终…...

java项目实战之图书管理系统(1)

✅作者简介:大家好,我是再无B~U~G,一个想要与大家共同进步的男人😉😉 🍎个人主页:再无B~U~G-CSDN博客 1.背景 图书管理系统是一种用于管理图书…...

3DGS渐进式渲染 - 离线生成渲染视频

总览 输入:环绕Object拍摄的RGB视频 输出:自定义相机路径的渲染视频(包含渐变效果) 实现过程 首先,编译3DGS的C代码,并跑通convert.py、train.py和render.py。教程如下: github网址&#xf…...

chromium 协议栈 cronet ios 踩坑案例

1、请求未携带 Accept-Language http header 出现图片加载失败 现象: 访问 https://www.huawei.com/cn/?ic_mediumdirect&ic_sourcesurlent 时出现图片加载失败的问题 预期结果: 原因: 网络库删除了添加 Accept-Language header 的逻…...

Java快速排序知识点(含面试大厂题和源码)

快速排序(Quick Sort)是一种高效的排序算法,采用分治法(Divide and Conquer)的策略来对一个数组进行排序。快速排序的平均时间复杂度为 O(n log n),在最坏的情况下为 O(n^2),但这种情况很少发生…...

SpringBoot整合Swagger2

SpringBoot整合Swagger2 1.什么是Swagger2?(应用场景)2.项目中如何使用2.1 导入依赖2.2 编写配置类2.3 注解使用2.3.1 controller注解:2.3.2 方法注解2.3.3 实体类注解2.3.4 方法返回值注解2.3.5 忽略的方法 3.UI界面 1.什么是Swa…...

C++算法题 - 矩阵

目录 36. 有效的数独54. 螺旋矩阵48. 旋转图像73. 矩阵置零289. 生命游戏 36. 有效的数独 LeetCode_link 请你判断一个 9 x 9 的数独是否有效。只需要 根据以下规则 ,验证已经填入的数字是否有效即可。 数字 1-9 在每一行只能出现一次。 数字 1-9 在每一列只能出现…...

记录一个没测出来,有点严重的Bug

前提: 人物:若干个 部门:若干个 部门有一个人物选择框,可以选择所有的人物,且为非必填字段 bug现象: 部门中 的人物选择框每次都少一个人物 代码分析: F12接口后端没问题,定位为前端的问题。 前…...

科学突破可能开创6G通信新时代

格拉斯哥大学开发的火柴盒大小的天线可以为全息通话、改进自动驾驶和更好的医疗保健的世界铺平道路。 格拉斯哥大学表示,这种创新的无线通信天线将超材料的独特特性与复杂的信号处理相结合,有助于构建未来的 6G 网络。 数字编码动态超表面天线&#xf…...

业务系统对接大模型的基础方案:架构设计与关键步骤

业务系统对接大模型:架构设计与关键步骤 在当今数字化转型的浪潮中,大语言模型(LLM)已成为企业提升业务效率和创新能力的关键技术之一。将大模型集成到业务系统中,不仅可以优化用户体验,还能为业务决策提供…...

日语AI面试高效通关秘籍:专业解读与青柚面试智能助攻

在如今就业市场竞争日益激烈的背景下,越来越多的求职者将目光投向了日本及中日双语岗位。但是,一场日语面试往往让许多人感到步履维艰。你是否也曾因为面试官抛出的“刁钻问题”而心生畏惧?面对生疏的日语交流环境,即便提前恶补了…...

【OSG学习笔记】Day 18: 碰撞检测与物理交互

物理引擎(Physics Engine) 物理引擎 是一种通过计算机模拟物理规律(如力学、碰撞、重力、流体动力学等)的软件工具或库。 它的核心目标是在虚拟环境中逼真地模拟物体的运动和交互,广泛应用于 游戏开发、动画制作、虚…...

今日科技热点速览

🔥 今日科技热点速览 🎮 任天堂Switch 2 正式发售 任天堂新一代游戏主机 Switch 2 今日正式上线发售,主打更强图形性能与沉浸式体验,支持多模态交互,受到全球玩家热捧 。 🤖 人工智能持续突破 DeepSeek-R1&…...

IT供电系统绝缘监测及故障定位解决方案

随着新能源的快速发展,光伏电站、储能系统及充电设备已广泛应用于现代能源网络。在光伏领域,IT供电系统凭借其持续供电性好、安全性高等优势成为光伏首选,但在长期运行中,例如老化、潮湿、隐裂、机械损伤等问题会影响光伏板绝缘层…...

全面解析各类VPN技术:GRE、IPsec、L2TP、SSL与MPLS VPN对比

目录 引言 VPN技术概述 GRE VPN 3.1 GRE封装结构 3.2 GRE的应用场景 GRE over IPsec 4.1 GRE over IPsec封装结构 4.2 为什么使用GRE over IPsec? IPsec VPN 5.1 IPsec传输模式(Transport Mode) 5.2 IPsec隧道模式(Tunne…...

如何在最短时间内提升打ctf(web)的水平?

刚刚刷完2遍 bugku 的 web 题,前来答题。 每个人对刷题理解是不同,有的人是看了writeup就等于刷了,有的人是收藏了writeup就等于刷了,有的人是跟着writeup做了一遍就等于刷了,还有的人是独立思考做了一遍就等于刷了。…...

LINUX 69 FTP 客服管理系统 man 5 /etc/vsftpd/vsftpd.conf

FTP 客服管理系统 实现kefu123登录,不允许匿名访问,kefu只能访问/data/kefu目录,不能查看其他目录 创建账号密码 useradd kefu echo 123|passwd -stdin kefu [rootcode caozx26420]# echo 123|passwd --stdin kefu 更改用户 kefu 的密码…...

Java编程之桥接模式

定义 桥接模式(Bridge Pattern)属于结构型设计模式,它的核心意图是将抽象部分与实现部分分离,使它们可以独立地变化。这种模式通过组合关系来替代继承关系,从而降低了抽象和实现这两个可变维度之间的耦合度。 用例子…...

深度学习水论文:mamba+图像增强

🧀当前视觉领域对高效长序列建模需求激增,对Mamba图像增强这方向的研究自然也逐渐火热。原因在于其高效长程建模,以及动态计算优势,在图像质量提升和细节恢复方面有难以替代的作用。 🧀因此短时间内,就有不…...