kafka-go操作kafka
package mainimport ("context""fmt""os""os/signal""syscall""time""github.com/segmentio/kafka-go"
)var (topic = "user_click"reader *kafka.Reader
)// 生产消息
func writeKafka(ctx context.Context) {writer := &kafka.Writer{Addr: kafka.TCP("localhost:9092"), //不定长参数,支持传入多个broker的ip:portTopic: topic, //为所有message指定统一的topic。如果这里不指定统一的Topic,则创建kafka.Message{}时需要分别指定TopicBalancer: &kafka.Hash{}, //把message的key进行hash,确定partitionWriteTimeout: 1 * time.Second, //设定写超时RequiredAcks: kafka.RequireNone, //RequireNone不需要等待ack返回,效率最高,安全性最低;RequireOne只需要确保Leader写入成功就可以发送下一条消息;RequiredAcks需要确保Leader和所有Follower都写入成功才可以发送下一条消息。AllowAutoTopicCreation: true, //Topic不存在时自动创建。生产环境中一般设为false,由运维管理员创建Topic并配置partition数目}defer writer.Close() //记得关闭连接for i := 0; i < 3; i++ { //允许重试3次if err := writer.WriteMessages(ctx, //批量写入消息,原子操作,要么全写成功,要么全写失败kafka.Message{Key: []byte("1"), Value: []byte("A")},kafka.Message{Key: []byte("2"), Value: []byte("B")},kafka.Message{Key: []byte("3"), Value: []byte("C")},kafka.Message{Key: []byte("1"), Value: []byte("D")}, //key相同时肯定写入同一个partitionkafka.Message{Key: []byte("2"), Value: []byte("E")},); err != nil {if err == kafka.LeaderNotAvailable { //首次写一个新的Topic时,会发生LeaderNotAvailable错误,重试一次就好了time.Sleep(500 * time.Millisecond)continue} else {fmt.Printf("batch write message failed: %v", err)}} else {break //只要成功一次就不再尝试下一次了}}
}// 消费消息
func readKafka(ctx context.Context) {reader = kafka.NewReader(kafka.ReaderConfig{Brokers: []string{"localhost:9092"}, //支持传入多个broker的ip:portTopic: topic,CommitInterval: 1 * time.Second, //每隔多长时间自动commit一次offset。即一边读一边向kafka上报读到了哪个位置。GroupID: "recommend_biz", //一个Group内消费到的消息不会重复StartOffset: kafka.FirstOffset, //当一个特定的partition没有commited offset时(比如第一次读一个partition,之前没有commit过),通过StartOffset指定从第一个还是最后一个位置开始消费。StartOffset的取值要么是FirstOffset要么是LastOffset,LastOffset表示Consumer启动之前生成的老数据不管了。仅当指定了GroupID时,StartOffset才生效。})// defer reader.Close() //由于下面是死循环,正常情况下readKafka()函数永远不会结束,defer不会执行。所以需要监听信息2和15,当收到信号时关闭reader。需要把reader设为全局变量for { //消息队列里随时可能有新消息进来,所以这里是死循环,类似于读Channelif message, err := reader.ReadMessage(ctx); err != nil {fmt.Printf("read message from kafka failed: %v", err)break} else {offset := message.Offsetfmt.Printf("topic=%s, partition=%d, offset=%d, key=%s, message content=%s\n", message.Topic, message.Partition, offset, string(message.Key), string(message.Value))}}
}// 需要监听信息2和15,当收到信号时关闭reader
func listenSignal() {c := make(chan os.Signal, 1)signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) //注册信号2和15sig := <-c //阻塞,直到信号的到来fmt.Printf("receive signal %s\n", sig.String())if reader != nil {reader.Close()}os.Exit(0) //进程退出
}func main() {ctx := context.Background()// writeKafka(ctx)go listenSignal()readKafka(ctx)
}
相关文章:
kafka-go操作kafka
package mainimport ("context""fmt""os""os/signal""syscall""time""github.com/segmentio/kafka-go" )var (topic "user_click"reader *kafka.Reader )// 生产消息 func writeKafka(ctx …...
如何判断被DDoS攻击
当网络和设备正常的情况下,服务器突然出现连接断开、访问卡顿、用户掉线等情况;服务器CPU或内存占用率出现明显增长;网络出入流量出现明显增长;网站或应用程序突然出现大量的未知访问;登录服务器失败或者登录过慢等等。以上是最为常见的服务器被 DDoS攻击后出现的几…...
web —— html
Web —— css基础 1. HTML2. 基本HTML结构3. HTML常用标签3.1 文本相关标签3.2 HTML图像标签3.3 HTML超链接标签3.4 HTML表,单3.4.1 HTML表格3.4.2 HTML表单,输入框(多选框,单选框)下拉框 3.5 HTML分区标签3.5.1 div标…...
【C/PTA】数组练习(编程)
本文结合PTA专项练习带领读者掌握数组,刷题为主注释为辅,在代码中理解思路,其它不做过多叙述。 文章目录 7-1 计算最大值出现的次数7-2 求一批整数中出现最多的个位数字7-3 装箱问题7-4 数组-值钱的微信号7-5 数组-吹泡泡7-6 数组-数学鬼才 7…...
力扣:155. 最小栈(Python3)
题目: 设计一个支持 push ,pop ,top 操作,并能在常数时间内检索到最小元素的栈。 实现 MinStack 类: MinStack() 初始化堆栈对象。void push(int val) 将元素val推入堆栈。void pop() 删除堆栈顶部的元素。int top() 获取堆栈顶部…...
uniapp实现在线PDF文件预览
下载pdf文件放在static文件夹下 bug:hbuildX创建的项目pdf文件夹可以放在根目录下面,但是cli创建的项目无法预览,只能放在static下面 按钮跳转预览页面 <button click"toPdf">pdf</button>methods: {toPdf() {uni.…...
Python tkinter实现复刻Windows记事本UI和菜单的文本编辑器(一)
下一篇:Python tkinter实现复刻Windows记事本UI和菜单的文本编辑器(二)-CSDN博客 介绍: Windows操作系统中自带了一款记事本应用程序,通常用于记录文字信息,具有简单文本编辑功能。Windows的记事本可以新…...
【系统架构设计】架构核心知识: 3.3 DSSA和ABSD
目录 一 特定领域软件架构DSSA 1 DSSA 2 DSSA的基本活动和产物 3 参与DSSA的人员 4 建立过程...
Git的安装和常用命令Git与SVN的区别Gitee远程仓库团队开发代码共享演示
目录 一、Git入门 1.1 Git简介 1.2 Git与SVN的区别 1.2.1 详解 1.2.2 图解 1.3 Git相较于SVN的优势与劣势 1.3.1 Git的优势与劣势 1.3.2 SVN的优势与劣势 1.4 Git的工作流程 1.4.1 图解 1.4.2 详解 二、Git的安装以及常用命令 2.1 Git官网链接 2.2 安装步骤 2.…...
五、计算机网络
(一)OSI/RM 七层模型 七层模型是计算机网络的基石,整个计算机网络是构建与七层模型之上的。 在数据链路层,数据开始以帧为单位,网卡的 MAC 地址就是数据帧的地址,数据的传输开始有地址了。 局域网是工作…...
使用Grafana与MySQL监控监控网络延迟
文章目录 前言python程序使用Grafana步骤1:安装和配置 Grafana步骤2:配置 Grafana 数据源步骤3:创建 Grafana 仪表盘步骤4:将 Grafana 仪表盘嵌入到博客中 前言 在网络应用中,网络延迟是一个重要的指标,它…...
互联网常见职称
1、管理层 CEO – Chief Executive Officer 首席执行官 VP – Vice President 副总裁 HRD – Humen Resource Director 人力资源总监 OD – Operations Director 运营总监 MD – Marketing Director 市场总监 GM – General Manager 总经理 PM – Production Manager 产品…...
UI设计软件有哪些好用和免费的吗?
在我们分享五个有用的原型工具之前,完成原型,将优化界面,这次是UI设计师的任务,UI设计软件对设计师非常重要,UI设计工具是否使用直接影响最终结果,然后有人会问:UI界面设计使用什么软件…...
Linux开发工具之编译器gcc/g++
文章目录 1.查看版本2.程序的翻译3.gcc指令3.1gcc hello.c -o hello3.2gcc -E hello.c -o hello.i3.3gcc -S hello.c -o hello.s3.4gcc -c hello.c -o hello.o3.5gcc hello.o -o hello 4.动静态库[详讲链接阶段]4.1初步认识4.2动态链接4.3静态链接 1.查看版本 gcc -v. 2.程序…...
【Kurbernetes资源管理】陈述式资源管理方式
陈述式 一、 理论部分1.1 管理K8s资源的基本方法1.1.1 陈述式资源管理方式1.1.2声明式资源管理方式1.1.3 GUI式资源管理方法 1.2 陈述式资源管理方式1.2.1 Kubelet工具简介1.2.2 kubectl 的基本语法1.2.3 Kubectl工具的自动补全功能 1.3 Kubernetes Service1.4 Service 的类型(…...
flink测试map转换函数和process函数
背景 在flink中,我们需要对我们写的map转换函数,process处理函数进行单元测试,测试的内容包括查看函数的输出结果是否符合以及函数内的状态是否正确更新,本文就记录几个测试过程中的要点 flink中测试函数 首先我们根据我们要测…...
【跟小嘉学习JavaWeb开发】第一章 开发环境搭建
系列文章目录 【跟小嘉学习JavaWeb开发】第一章 开发环境搭建 文章目录 系列文章目录[TOC](文章目录) 前言一、JDK的下载与安装1.1、关于JDK的版本问题 二、环境变量配置2.1、配置 JAVA_HOME、CLASSPATH2.2、配置path2.3、启动 cmd 三、编写代码、编译并执行3.1、编写代码&…...
CSS语法、选择器、属性
1.css语法 * 格式:选择器 {属性名1:属性值1;属性名2:属性值2;...}* 选择器:筛选具有相似特征的元素* 注意:* 每一对属性需要使用;隔开,最后一对属性可以不加 2.选择器:筛选具有相似特征的元素 * 分类:1. 基…...
深度学习读取txt训练数据绘制参数曲线图的方法
有一些深度学习模型是并不像yolo系列那样最终输出相应的参数图,有很多训练形成了一个训练log文件,于是需要读取log文件中的内容并绘制成曲线图。 如下实例,有一个log文件的部分截图,需要将其读取出来并绘制曲线图 废话不多说&…...
VB.NET—DataGridView控件教程详解
目录 前言: 过程: 第一步: 第二步: 第三步: 第四步: 第五步: 番外篇: 总结: 前言: DataGridView是.NET FormK中的一个Windows窗体控件,它提供了一个可视化的表格控件,允许用户以表格形式显示和编辑数据。它通常用于显示和编辑数据库…...
C++的输入和输出流详解
输入和输出流从键盘输入数据,输出到显示器屏幕。这种输入输出称为标准的输入输出,简称标准I/O。从磁盘文件输入数据,数据输出到磁盘文件简称文件I/O。对内存中指定的空间进行输入输出,通常指定一个字符数组作为存储空间࿰…...
深入浅出 Elasticsearch 倒排索引:从传统检索到 FST 数据结构的革命
深入浅出 Elasticsearch 倒排索引:从传统检索到 FST 数据结构的革命前言一、从传统检索说起1.1 正向索引(Forward Index)二、倒排索引的核心思想2.1 什么是倒排索引?2.2 倒排索引的组成2.3 构建示例三、倒排索引的进阶结构3.1 常见…...
零基础部署腾讯视觉语言模型:Youtu-VL-4B-Instruct环境配置与快速上手指南
零基础部署腾讯视觉语言模型:Youtu-VL-4B-Instruct环境配置与快速上手指南 1. 引言:为什么选择Youtu-VL-4B-Instruct? 如果你正在寻找一个既能理解图片内容,又能进行自然对话的多模态AI助手,腾讯优图实验室开源的You…...
ArcGIS Desktop实战:如何把图层里零散的面要素一键融合成单个面(附Python读取避坑点)
ArcGIS Desktop实战:零散面要素融合与Python读取避坑指南 当你在处理行政区划合并、地块整合或生态保护区划定时,是否遇到过这样的困扰:图层中密密麻麻的零散面要素不仅影响可视化效果,更会在使用Python进行数据分析时埋下隐患&am…...
Real-ESRGAN-ncnn-vulkan:AI图像超分辨率技术实战指南
Real-ESRGAN-ncnn-vulkan:AI图像超分辨率技术实战指南 【免费下载链接】Real-ESRGAN-ncnn-vulkan NCNN implementation of Real-ESRGAN. Real-ESRGAN aims at developing Practical Algorithms for General Image Restoration. 项目地址: https://gitcode.com/gh_…...
Notepad-- 完全指南:打造你的跨平台中文文本编辑器
Notepad-- 完全指南:打造你的跨平台中文文本编辑器 【免费下载链接】notepad-- 一个支持windows/linux/mac的文本编辑器,目标是做中国人自己的编辑器,来自中国。 项目地址: https://gitcode.com/GitHub_Trending/no/notepad-- 如果你正…...
5个关键决策点:为什么技术领导者选择Testsigma作为下一代AI驱动测试平台
5个关键决策点:为什么技术领导者选择Testsigma作为下一代AI驱动测试平台 【免费下载链接】testsigma Testsigma is an agentic test automation platform powered by AI-coworkers that work alongside QA teams to simplify testing, accelerate releases and impr…...
从一道网鼎杯VM题出发,聊聊逆向工程中‘信号’处理的那些事儿
逆向工程中的"信号"隐喻:从网鼎杯VM题看指令流解码艺术 在CTF逆向工程领域,虚拟机(VM)保护技术一直是令人又爱又恨的存在。去年网鼎杯的这道signal题目,表面上是个典型的VM逆向题,但解题过程中我忽然意识到——我们逆向…...
告别树莓派低电压警告!一个脚本实时监控功耗,并自动优化性能设置
树莓派智能功耗管理:从电压监控到自动化性能调优 树莓派爱好者们可能都见过那个令人不安的黄色闪电图标——低电压警告。这个看似简单的提示背后,隐藏着电源管理、系统稳定性与性能调优的复杂平衡。对于将树莓派用作家庭服务器、物联网网关或边缘计算节…...
深入AT89S52时钟与功耗:如何设计一个省电又可靠的电池供电传感节点?
AT89S52超低功耗传感节点设计实战:从时钟选择到唤醒策略 在野外环境监测、农业物联网或工业设备远程诊断等场景中,电池供电的传感节点往往需要持续工作数月甚至数年。我曾参与过一个高原气象监测项目,节点设备在零下20度的环境中需要依靠单节…...
