kafka-go操作kafka
package mainimport ("context""fmt""os""os/signal""syscall""time""github.com/segmentio/kafka-go"
)var (topic = "user_click"reader *kafka.Reader
)// 生产消息
func writeKafka(ctx context.Context) {writer := &kafka.Writer{Addr: kafka.TCP("localhost:9092"), //不定长参数,支持传入多个broker的ip:portTopic: topic, //为所有message指定统一的topic。如果这里不指定统一的Topic,则创建kafka.Message{}时需要分别指定TopicBalancer: &kafka.Hash{}, //把message的key进行hash,确定partitionWriteTimeout: 1 * time.Second, //设定写超时RequiredAcks: kafka.RequireNone, //RequireNone不需要等待ack返回,效率最高,安全性最低;RequireOne只需要确保Leader写入成功就可以发送下一条消息;RequiredAcks需要确保Leader和所有Follower都写入成功才可以发送下一条消息。AllowAutoTopicCreation: true, //Topic不存在时自动创建。生产环境中一般设为false,由运维管理员创建Topic并配置partition数目}defer writer.Close() //记得关闭连接for i := 0; i < 3; i++ { //允许重试3次if err := writer.WriteMessages(ctx, //批量写入消息,原子操作,要么全写成功,要么全写失败kafka.Message{Key: []byte("1"), Value: []byte("A")},kafka.Message{Key: []byte("2"), Value: []byte("B")},kafka.Message{Key: []byte("3"), Value: []byte("C")},kafka.Message{Key: []byte("1"), Value: []byte("D")}, //key相同时肯定写入同一个partitionkafka.Message{Key: []byte("2"), Value: []byte("E")},); err != nil {if err == kafka.LeaderNotAvailable { //首次写一个新的Topic时,会发生LeaderNotAvailable错误,重试一次就好了time.Sleep(500 * time.Millisecond)continue} else {fmt.Printf("batch write message failed: %v", err)}} else {break //只要成功一次就不再尝试下一次了}}
}// 消费消息
func readKafka(ctx context.Context) {reader = kafka.NewReader(kafka.ReaderConfig{Brokers: []string{"localhost:9092"}, //支持传入多个broker的ip:portTopic: topic,CommitInterval: 1 * time.Second, //每隔多长时间自动commit一次offset。即一边读一边向kafka上报读到了哪个位置。GroupID: "recommend_biz", //一个Group内消费到的消息不会重复StartOffset: kafka.FirstOffset, //当一个特定的partition没有commited offset时(比如第一次读一个partition,之前没有commit过),通过StartOffset指定从第一个还是最后一个位置开始消费。StartOffset的取值要么是FirstOffset要么是LastOffset,LastOffset表示Consumer启动之前生成的老数据不管了。仅当指定了GroupID时,StartOffset才生效。})// defer reader.Close() //由于下面是死循环,正常情况下readKafka()函数永远不会结束,defer不会执行。所以需要监听信息2和15,当收到信号时关闭reader。需要把reader设为全局变量for { //消息队列里随时可能有新消息进来,所以这里是死循环,类似于读Channelif message, err := reader.ReadMessage(ctx); err != nil {fmt.Printf("read message from kafka failed: %v", err)break} else {offset := message.Offsetfmt.Printf("topic=%s, partition=%d, offset=%d, key=%s, message content=%s\n", message.Topic, message.Partition, offset, string(message.Key), string(message.Value))}}
}// 需要监听信息2和15,当收到信号时关闭reader
func listenSignal() {c := make(chan os.Signal, 1)signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) //注册信号2和15sig := <-c //阻塞,直到信号的到来fmt.Printf("receive signal %s\n", sig.String())if reader != nil {reader.Close()}os.Exit(0) //进程退出
}func main() {ctx := context.Background()// writeKafka(ctx)go listenSignal()readKafka(ctx)
}
相关文章:
kafka-go操作kafka
package mainimport ("context""fmt""os""os/signal""syscall""time""github.com/segmentio/kafka-go" )var (topic "user_click"reader *kafka.Reader )// 生产消息 func writeKafka(ctx …...
如何判断被DDoS攻击
当网络和设备正常的情况下,服务器突然出现连接断开、访问卡顿、用户掉线等情况;服务器CPU或内存占用率出现明显增长;网络出入流量出现明显增长;网站或应用程序突然出现大量的未知访问;登录服务器失败或者登录过慢等等。以上是最为常见的服务器被 DDoS攻击后出现的几…...
web —— html
Web —— css基础 1. HTML2. 基本HTML结构3. HTML常用标签3.1 文本相关标签3.2 HTML图像标签3.3 HTML超链接标签3.4 HTML表,单3.4.1 HTML表格3.4.2 HTML表单,输入框(多选框,单选框)下拉框 3.5 HTML分区标签3.5.1 div标…...
【C/PTA】数组练习(编程)
本文结合PTA专项练习带领读者掌握数组,刷题为主注释为辅,在代码中理解思路,其它不做过多叙述。 文章目录 7-1 计算最大值出现的次数7-2 求一批整数中出现最多的个位数字7-3 装箱问题7-4 数组-值钱的微信号7-5 数组-吹泡泡7-6 数组-数学鬼才 7…...
力扣:155. 最小栈(Python3)
题目: 设计一个支持 push ,pop ,top 操作,并能在常数时间内检索到最小元素的栈。 实现 MinStack 类: MinStack() 初始化堆栈对象。void push(int val) 将元素val推入堆栈。void pop() 删除堆栈顶部的元素。int top() 获取堆栈顶部…...
uniapp实现在线PDF文件预览
下载pdf文件放在static文件夹下 bug:hbuildX创建的项目pdf文件夹可以放在根目录下面,但是cli创建的项目无法预览,只能放在static下面 按钮跳转预览页面 <button click"toPdf">pdf</button>methods: {toPdf() {uni.…...
Python tkinter实现复刻Windows记事本UI和菜单的文本编辑器(一)
下一篇:Python tkinter实现复刻Windows记事本UI和菜单的文本编辑器(二)-CSDN博客 介绍: Windows操作系统中自带了一款记事本应用程序,通常用于记录文字信息,具有简单文本编辑功能。Windows的记事本可以新…...
【系统架构设计】架构核心知识: 3.3 DSSA和ABSD
目录 一 特定领域软件架构DSSA 1 DSSA 2 DSSA的基本活动和产物 3 参与DSSA的人员 4 建立过程...
Git的安装和常用命令Git与SVN的区别Gitee远程仓库团队开发代码共享演示
目录 一、Git入门 1.1 Git简介 1.2 Git与SVN的区别 1.2.1 详解 1.2.2 图解 1.3 Git相较于SVN的优势与劣势 1.3.1 Git的优势与劣势 1.3.2 SVN的优势与劣势 1.4 Git的工作流程 1.4.1 图解 1.4.2 详解 二、Git的安装以及常用命令 2.1 Git官网链接 2.2 安装步骤 2.…...
五、计算机网络
(一)OSI/RM 七层模型 七层模型是计算机网络的基石,整个计算机网络是构建与七层模型之上的。 在数据链路层,数据开始以帧为单位,网卡的 MAC 地址就是数据帧的地址,数据的传输开始有地址了。 局域网是工作…...
使用Grafana与MySQL监控监控网络延迟
文章目录 前言python程序使用Grafana步骤1:安装和配置 Grafana步骤2:配置 Grafana 数据源步骤3:创建 Grafana 仪表盘步骤4:将 Grafana 仪表盘嵌入到博客中 前言 在网络应用中,网络延迟是一个重要的指标,它…...
互联网常见职称
1、管理层 CEO – Chief Executive Officer 首席执行官 VP – Vice President 副总裁 HRD – Humen Resource Director 人力资源总监 OD – Operations Director 运营总监 MD – Marketing Director 市场总监 GM – General Manager 总经理 PM – Production Manager 产品…...
UI设计软件有哪些好用和免费的吗?
在我们分享五个有用的原型工具之前,完成原型,将优化界面,这次是UI设计师的任务,UI设计软件对设计师非常重要,UI设计工具是否使用直接影响最终结果,然后有人会问:UI界面设计使用什么软件…...
Linux开发工具之编译器gcc/g++
文章目录 1.查看版本2.程序的翻译3.gcc指令3.1gcc hello.c -o hello3.2gcc -E hello.c -o hello.i3.3gcc -S hello.c -o hello.s3.4gcc -c hello.c -o hello.o3.5gcc hello.o -o hello 4.动静态库[详讲链接阶段]4.1初步认识4.2动态链接4.3静态链接 1.查看版本 gcc -v. 2.程序…...
【Kurbernetes资源管理】陈述式资源管理方式
陈述式 一、 理论部分1.1 管理K8s资源的基本方法1.1.1 陈述式资源管理方式1.1.2声明式资源管理方式1.1.3 GUI式资源管理方法 1.2 陈述式资源管理方式1.2.1 Kubelet工具简介1.2.2 kubectl 的基本语法1.2.3 Kubectl工具的自动补全功能 1.3 Kubernetes Service1.4 Service 的类型(…...
flink测试map转换函数和process函数
背景 在flink中,我们需要对我们写的map转换函数,process处理函数进行单元测试,测试的内容包括查看函数的输出结果是否符合以及函数内的状态是否正确更新,本文就记录几个测试过程中的要点 flink中测试函数 首先我们根据我们要测…...
【跟小嘉学习JavaWeb开发】第一章 开发环境搭建
系列文章目录 【跟小嘉学习JavaWeb开发】第一章 开发环境搭建 文章目录 系列文章目录[TOC](文章目录) 前言一、JDK的下载与安装1.1、关于JDK的版本问题 二、环境变量配置2.1、配置 JAVA_HOME、CLASSPATH2.2、配置path2.3、启动 cmd 三、编写代码、编译并执行3.1、编写代码&…...
CSS语法、选择器、属性
1.css语法 * 格式:选择器 {属性名1:属性值1;属性名2:属性值2;...}* 选择器:筛选具有相似特征的元素* 注意:* 每一对属性需要使用;隔开,最后一对属性可以不加 2.选择器:筛选具有相似特征的元素 * 分类:1. 基…...
深度学习读取txt训练数据绘制参数曲线图的方法
有一些深度学习模型是并不像yolo系列那样最终输出相应的参数图,有很多训练形成了一个训练log文件,于是需要读取log文件中的内容并绘制成曲线图。 如下实例,有一个log文件的部分截图,需要将其读取出来并绘制曲线图 废话不多说&…...
VB.NET—DataGridView控件教程详解
目录 前言: 过程: 第一步: 第二步: 第三步: 第四步: 第五步: 番外篇: 总结: 前言: DataGridView是.NET FormK中的一个Windows窗体控件,它提供了一个可视化的表格控件,允许用户以表格形式显示和编辑数据。它通常用于显示和编辑数据库…...
golang如何实现多活架构方案_golang多活架构方案实现教程
多活核心是流量调度而非服务启动,需在注册、发现、路由、重试等全链路显式支持region标签与fallback。Golang因轻量稳定适配手写逻辑,读多活写单中心是务实起点,DNS/K8s/grpc默认机制均需绕过,必须通过context传region、自定义res…...
微软Azure AKS部署Magma云原生5G核心网实战指南
1. 项目概述:从“熔岩”到云原生电信核心网 如果你在电信行业或者云原生技术圈里待过一阵子,大概率听说过“Magma”这个名字。这可不是什么火山喷发的岩浆,而是一个由Meta(原Facebook)发起,并已捐赠给Linu…...
5个核心功能+3种应用场景:NSC_BUILDER让您的Switch游戏管理更高效
5个核心功能3种应用场景:NSC_BUILDER让您的Switch游戏管理更高效 【免费下载链接】NSC_BUILDER Nintendo Switch Cleaner and Builder. A batchfile, python and html script based in hacbuild and Nuts python libraries. Designed initially to erase titleright…...
别再为Unity WebGL播放本地视频发愁了!VideoPlayer + StreamingAssets保姆级避坑指南
Unity WebGL本地视频播放全攻略:VideoPlayer与StreamingAssets深度解析 第一次在Unity WebGL项目中尝试播放本地视频时,我遇到了一个令人抓狂的问题——视频在编辑器里运行完美,但打包后却死活不显示。经过整整两天的调试才发现,原…...
[具身智能-465]:声学特征与梅尔频谱图
梅尔频谱图(Mel-spectrogram)本质上就是一种最主流、最重要的声学特征。我们可以这样理解它们的关系:“声学特征”是一个广义的类别概念,而“梅尔频谱图”是这个类别下目前应用最广泛的具体形式。为了让更清晰地理解这两个概念及其…...
掌握AI大模型,抢占未来先机:从零开始构建你的智能应用!
本文介绍了人工智能的发展历程、应用范围及挑战,重点强调了新一代AI大模型的应用潜力与开发门槛的降低。作者通过亲身体验ChatGPT等AI工具,呼吁大家拥抱AI浪潮,学习新一代AI应用编程。文章提出,AI技术的发展是为了服务人类&#x…...
80年代法国电视加密技术Discret 11:曾改变行业格局,却因盗版停用
【FABIEN SANGLARDS WEBSITE相关信息】 网站提供了联系方式(CONTACT)、RSS订阅(RSS)和捐赠渠道(DONATE)。时间为2020年6月7日,主题是80年代法国电视加密技术Discret 11。 【80年代法国电视情况】…...
打开文件有多难?Flatpak 安全分析暴露问题,修复后更安全
艰难地打开一个文件在不同场景下,打开文件难度不同。若开发涉及安全边界且与文件有关的东西,打开文件可能极其困难。在最坏情况下,安全边界两侧进程操作共享文件系统树,会面临子路径含 ..、路径组件为符号链接、TOCTOU 竞态等问题…...
WaveDrom:5个技巧快速掌握专业数字时序图生成器
WaveDrom:5个技巧快速掌握专业数字时序图生成器 【免费下载链接】wavedrom :ocean: Digital timing diagram rendering engine 项目地址: https://gitcode.com/gh_mirrors/wa/wavedrom 还在为绘制复杂的数字电路时序图而烦恼吗?每次设计文档更新都…...
3大YOLOv11多光谱目标检测实战痛点诊断与修复指南
3大YOLOv11多光谱目标检测实战痛点诊断与修复指南 【免费下载链接】ultralytics Ultralytics YOLO 🚀 项目地址: https://gitcode.com/GitHub_Trending/ul/ultralytics 在农业遥感监测、夜间安防监控、医疗影像分析等场景中,多光谱目标检测技术凭…...
