go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费
环境的搭建
Kafka以及相关组件的下载
我们要实现今天的内容,不可避免的要进行对开发环境的配置,Kafka环境的配置比较繁琐,需要配置JDK,Scala,ZoopKeeper和Kafka,这里我们不做赘述,如果大家不知道如何配置环境,这里我们个大家找了一篇博文供大家参考:
Windows下安装Kafka(图文记录详细步骤)
sarama包的安装
今天我们所时机的内容需要用到go语言的第三方包sarama,由于1.19版本后添加了ztcd压缩算法,需要用到cgo,这里我们为了方便考虑选择下载sarama v1.19.0,所以这里我们不能直接使用go get'命令来安装第三方包,我们要使用/go mod文件来实现,下面是主要步骤:
- 在项目中创建文件夹(博主的是Kafkademo)
- 打开终端,输入
go mod init,进行go.mod文件的初始化:


- 我们在.mod文件内指定第三方包及其版本:
module Kafkademorequire (github.com/Shopify/sarama v1.19
)go 1.21.6
其实这是已经可以使用命令go mod tidy了,但是博主在做的时候发现,这样会直接清除掉.mod文件里面的内容,所以建议先创建一个producer文件,在文件里面写:
package mainimport ("fmt""github.com/Shopify/sarama"
)func main() {config := sarama.NewConfig()config.Producer.RequiredAcks = sarama.WaitForAll
}
这时候再打开终端输入go mod tidy

等待命令运行完毕,打开.mod文件,看到如下内容就OK了:

利用sarama向Kafka发送消息(消息的生产)
代码
package mainimport ("fmt""github.com/Shopify/sarama"
)func main() {config := sarama.NewConfig() //创建config实例config.Producer.RequiredAcks = sarama.WaitForAll //发送完数据需要leader和follow都确认config.Producer.Partitioner = sarama.NewRandomPartitioner //创建随机分区config.Producer.Return.Successes = true //成功交付的消息将在success channel返回//创建信息msg := &sarama.ProducerMessage{}msg.Topic = "web.log"msg.Value = sarama.StringEncoder("this is a test log")//连接KafKaclient, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)if err != nil {fmt.Println("producer closed, err:", err)return}defer client.Close()//发送消息pid, offset, err := client.SendMessage(msg)if err != nil {fmt.Println("send msg failed,err:", err)return}fmt.Printf("pid:%v offset:%v\n", pid, offset)
}
运行过程
- 首先我们打开终端开起ZooKepper服务
zkServer
- 然后再Kafka所在文件夹下输入命令运行Kafka:
.\bin\windows\kafka-server-start.bat .\config\server.properties

最后运行程序即可,输出结果为:

补充:消息的消费
代码
package mainimport ("fmt""github.com/Shopify/sarama""time"
)func main() {customer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)if err != nil {fmt.Println("failed init customer,err:", err)return}partitionlist, err := customer.Partitions("web.log-0") //获取topic的所有分区if err != nil {fmt.Println("failed get partition list,err:", err)return}fmt.Println("partitions:", partitionlist)for partition := range partitionlist { // 遍历所有分区//根据消费者对象创建一个分区对象pc, err := customer.ConsumePartition("web.log", int32(partition), sarama.OffsetNewest)if err != nil {fmt.Println("failed get partition consumer,err:", err)return}defer pc.Close() // 移动到这里go func(consumer sarama.PartitionConsumer) {defer pc.AsyncClose() // 移除这行,因为已经在循环结束时关闭了for msg := range pc.Messages() {fmt.Printf("Partition:%d Offset:%d Key:%v Value:%v", msg.Partition, msg.Offset, msg.Key, msg.Value)}}(pc)time.Sleep(time.Second * 10)}
}
不过这个不能实现直接的消费,后续我们会对这个进行补充,这里仅作介绍。
相关文章:
go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费
环境的搭建 Kafka以及相关组件的下载 我们要实现今天的内容,不可避免的要进行对开发环境的配置,Kafka环境的配置比较繁琐,需要配置JDK,Scala,ZoopKeeper和Kafka,这里我们不做赘述,如果大家不知道如何配置环境&#x…...
Go 单元测试之Mysql数据库集成测试
文章目录 一、 sqlmock介绍二、安装三、基本用法四、一个小案例五、Gorm 初始化注意点 一、 sqlmock介绍 sqlmock 是一个用于测试数据库交互的 Go 模拟库。它可以模拟 SQL 查询、插入、更新等操作,并且可以验证 SQL 语句的执行情况,非常适合用于单元测试…...
Prometheus + Grafana 搭建监控仪表盘
目标要求 1、需要展现的仪表盘: SpringBoot或JVM仪表盘 Centos物理机服务器(实际为物理分割的虚拟服务器)仪表盘 2、展现要求: 探索Prometheus Grafana搭建起来的展示效果,尽可能展示能展示的部分。 一、下载软件包 监控系统核心…...
机器人管理系统的增删查改(Python)
#交互模式 robot ["机器人1","机器人2","机器人3","机器人4"] name input("请输入您的姓名:") print("%s您好欢迎使用机器人管理系统"%(name))while True:print("您可以进行 1.查找 2.修改 3.增…...
【.Net动态Web API】背景与实现原理
🚀前言 本文是《.Net Core进阶编程课程》教程专栏的导航站(点击链接,跳转到专栏主页,欢迎订阅,持续更新…) 专栏介绍:通过源码实例来讲解Asp.Net Core进阶知识点,让大家完全掌握每一…...
JS-43-Node.js02-安装Node.js和npm
Node.js是一个基于Chrome V8引擎的JavaScript运行时环境,可以让JavaScript实现后端开发,所以,首先在本机安装Node.js环境。 一、安装Node.js 官网:下载 Node.js 默认两个版本的下载: 64位windows系统的LTS(Long Tim…...
设计模式(分类)
目录 设计模式(分类) 设计模式(六大原则) 设计模式是软件工程中一种经过验证的、用于解决特定设计问题的通用解决方案。它们是面向对象编程(Object-Oriented Programming, OOP)实践中提炼出的最佳实…...
请陪伴Kimi和GPT成长
经验的闪光汤圆 但是我想要写实的 你有吗? 岁数大了,希望如何学习新知识呢?又觉得自己哪些能力亟需补强呢? 看论文自然得用Kimi,主要是肝不动了,眼睛也顶不住了。 正好昨天跟专业人士学会了用工作流的办法跟…...
优思学院|ISO45001职业健康安全管理体系是什么?
ISO45001:2018是新公布的国际标准规范,全球备受期待的职业健康与安全国际标准(OH&S)于2018年公布,并将在全球范围内改变工作场所实践。ISO45001将取代OHSAS18001,成为全球工作场所健康与安全的参考。 ISO45001:201…...
抖去推短视频矩阵系统----源头开发
为什么一直说让企业去做短视频矩阵?而好处就是有更多的流量入口,不同平台或账号之间可以进行资源互换,最终目的就是获客留咨,提单转化。你去看一些做得大的账号,你会发现他们在许多大的平台上,都有自己的账…...
Golang函数重试机制实现
前言 在编写应用程序时,有时候会遇到一些短暂的错误,例如网络请求、服务链接终端失败等,这些错误可能导致函数执行失败。 但是如果稍后执行可能会成功,那么在一些业务场景下就需要重试了,重试的概念很简单,…...
工业电脑在ESOP工作站行业应用
ESOP工作站行业应用 项目背景 E-SOP是实现作业指导书电子化,并统一管理和集中控制的一套管理信息平台。信迈科技的ESOP终端是一款体积小巧功能齐全的高性价比工业电脑,上层通过网络与MES系统连接,下层连接显示器展示作业指导书。ESOP控制终…...
java项目实战之图书管理系统(1)
✅作者简介:大家好,我是再无B~U~G,一个想要与大家共同进步的男人😉😉 🍎个人主页:再无B~U~G-CSDN博客 1.背景 图书管理系统是一种用于管理图书…...
3DGS渐进式渲染 - 离线生成渲染视频
总览 输入:环绕Object拍摄的RGB视频 输出:自定义相机路径的渲染视频(包含渐变效果) 实现过程 首先,编译3DGS的C代码,并跑通convert.py、train.py和render.py。教程如下: github网址…...
chromium 协议栈 cronet ios 踩坑案例
1、请求未携带 Accept-Language http header 出现图片加载失败 现象: 访问 https://www.huawei.com/cn/?ic_mediumdirect&ic_sourcesurlent 时出现图片加载失败的问题 预期结果: 原因: 网络库删除了添加 Accept-Language header 的逻…...
Java快速排序知识点(含面试大厂题和源码)
快速排序(Quick Sort)是一种高效的排序算法,采用分治法(Divide and Conquer)的策略来对一个数组进行排序。快速排序的平均时间复杂度为 O(n log n),在最坏的情况下为 O(n^2),但这种情况很少发生…...
SpringBoot整合Swagger2
SpringBoot整合Swagger2 1.什么是Swagger2?(应用场景)2.项目中如何使用2.1 导入依赖2.2 编写配置类2.3 注解使用2.3.1 controller注解:2.3.2 方法注解2.3.3 实体类注解2.3.4 方法返回值注解2.3.5 忽略的方法 3.UI界面 1.什么是Swa…...
C++算法题 - 矩阵
目录 36. 有效的数独54. 螺旋矩阵48. 旋转图像73. 矩阵置零289. 生命游戏 36. 有效的数独 LeetCode_link 请你判断一个 9 x 9 的数独是否有效。只需要 根据以下规则 ,验证已经填入的数字是否有效即可。 数字 1-9 在每一行只能出现一次。 数字 1-9 在每一列只能出现…...
记录一个没测出来,有点严重的Bug
前提: 人物:若干个 部门:若干个 部门有一个人物选择框,可以选择所有的人物,且为非必填字段 bug现象: 部门中 的人物选择框每次都少一个人物 代码分析: F12接口后端没问题,定位为前端的问题。 前…...
科学突破可能开创6G通信新时代
格拉斯哥大学开发的火柴盒大小的天线可以为全息通话、改进自动驾驶和更好的医疗保健的世界铺平道路。 格拉斯哥大学表示,这种创新的无线通信天线将超材料的独特特性与复杂的信号处理相结合,有助于构建未来的 6G 网络。 数字编码动态超表面天线…...
HTML 语义化
目录 HTML 语义化HTML5 新特性HTML 语义化的好处语义化标签的使用场景最佳实践 HTML 语义化 HTML5 新特性 标准答案: 语义化标签: <header>:页头<nav>:导航<main>:主要内容<article>&#x…...
基于大模型的 UI 自动化系统
基于大模型的 UI 自动化系统 下面是一个完整的 Python 系统,利用大模型实现智能 UI 自动化,结合计算机视觉和自然语言处理技术,实现"看屏操作"的能力。 系统架构设计 #mermaid-svg-2gn2GRvh5WCP2ktF {font-family:"trebuchet ms",verdana,arial,sans-…...
stm32G473的flash模式是单bank还是双bank?
今天突然有人stm32G473的flash模式是单bank还是双bank?由于时间太久,我真忘记了。搜搜发现,还真有人和我一样。见下面的链接:https://shequ.stmicroelectronics.cn/forum.php?modviewthread&tid644563 根据STM32G4系列参考手…...
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする 1、前言(1)情况说明(2)工程师的信仰2、知识点(1) にする1,接续:名词+にする2,接续:疑问词+にする3,(A)は(B)にする。(2)復習:(1)复习句子(2)ために & ように(3)そう(4)にする3、…...
工业安全零事故的智能守护者:一体化AI智能安防平台
前言: 通过AI视觉技术,为船厂提供全面的安全监控解决方案,涵盖交通违规检测、起重机轨道安全、非法入侵检测、盗窃防范、安全规范执行监控等多个方面,能够实现对应负责人反馈机制,并最终实现数据的统计报表。提升船厂…...
使用van-uploader 的UI组件,结合vue2如何实现图片上传组件的封装
以下是基于 vant-ui(适配 Vue2 版本 )实现截图中照片上传预览、删除功能,并封装成可复用组件的完整代码,包含样式和逻辑实现,可直接在 Vue2 项目中使用: 1. 封装的图片上传组件 ImageUploader.vue <te…...
【python异步多线程】异步多线程爬虫代码示例
claude生成的python多线程、异步代码示例,模拟20个网页的爬取,每个网页假设要0.5-2秒完成。 代码 Python多线程爬虫教程 核心概念 多线程:允许程序同时执行多个任务,提高IO密集型任务(如网络请求)的效率…...
AI书签管理工具开发全记录(十九):嵌入资源处理
1.前言 📝 在上一篇文章中,我们完成了书签的导入导出功能。本篇文章我们研究如何处理嵌入资源,方便后续将资源打包到一个可执行文件中。 2.embed介绍 🎯 Go 1.16 引入了革命性的 embed 包,彻底改变了静态资源管理的…...
RabbitMQ入门4.1.0版本(基于java、SpringBoot操作)
RabbitMQ 一、RabbitMQ概述 RabbitMQ RabbitMQ最初由LShift和CohesiveFT于2007年开发,后来由Pivotal Software Inc.(现为VMware子公司)接管。RabbitMQ 是一个开源的消息代理和队列服务器,用 Erlang 语言编写。广泛应用于各种分布…...
从面试角度回答Android中ContentProvider启动原理
Android中ContentProvider原理的面试角度解析,分为已启动和未启动两种场景: 一、ContentProvider已启动的情况 1. 核心流程 触发条件:当其他组件(如Activity、Service)通过ContentR…...
