kafka-go操作kafka
package mainimport ("context""fmt""os""os/signal""syscall""time""github.com/segmentio/kafka-go"
)var (topic = "user_click"reader *kafka.Reader
)// 生产消息
func writeKafka(ctx context.Context) {writer := &kafka.Writer{Addr: kafka.TCP("localhost:9092"), //不定长参数,支持传入多个broker的ip:portTopic: topic, //为所有message指定统一的topic。如果这里不指定统一的Topic,则创建kafka.Message{}时需要分别指定TopicBalancer: &kafka.Hash{}, //把message的key进行hash,确定partitionWriteTimeout: 1 * time.Second, //设定写超时RequiredAcks: kafka.RequireNone, //RequireNone不需要等待ack返回,效率最高,安全性最低;RequireOne只需要确保Leader写入成功就可以发送下一条消息;RequiredAcks需要确保Leader和所有Follower都写入成功才可以发送下一条消息。AllowAutoTopicCreation: true, //Topic不存在时自动创建。生产环境中一般设为false,由运维管理员创建Topic并配置partition数目}defer writer.Close() //记得关闭连接for i := 0; i < 3; i++ { //允许重试3次if err := writer.WriteMessages(ctx, //批量写入消息,原子操作,要么全写成功,要么全写失败kafka.Message{Key: []byte("1"), Value: []byte("A")},kafka.Message{Key: []byte("2"), Value: []byte("B")},kafka.Message{Key: []byte("3"), Value: []byte("C")},kafka.Message{Key: []byte("1"), Value: []byte("D")}, //key相同时肯定写入同一个partitionkafka.Message{Key: []byte("2"), Value: []byte("E")},); err != nil {if err == kafka.LeaderNotAvailable { //首次写一个新的Topic时,会发生LeaderNotAvailable错误,重试一次就好了time.Sleep(500 * time.Millisecond)continue} else {fmt.Printf("batch write message failed: %v", err)}} else {break //只要成功一次就不再尝试下一次了}}
}// 消费消息
func readKafka(ctx context.Context) {reader = kafka.NewReader(kafka.ReaderConfig{Brokers: []string{"localhost:9092"}, //支持传入多个broker的ip:portTopic: topic,CommitInterval: 1 * time.Second, //每隔多长时间自动commit一次offset。即一边读一边向kafka上报读到了哪个位置。GroupID: "recommend_biz", //一个Group内消费到的消息不会重复StartOffset: kafka.FirstOffset, //当一个特定的partition没有commited offset时(比如第一次读一个partition,之前没有commit过),通过StartOffset指定从第一个还是最后一个位置开始消费。StartOffset的取值要么是FirstOffset要么是LastOffset,LastOffset表示Consumer启动之前生成的老数据不管了。仅当指定了GroupID时,StartOffset才生效。})// defer reader.Close() //由于下面是死循环,正常情况下readKafka()函数永远不会结束,defer不会执行。所以需要监听信息2和15,当收到信号时关闭reader。需要把reader设为全局变量for { //消息队列里随时可能有新消息进来,所以这里是死循环,类似于读Channelif message, err := reader.ReadMessage(ctx); err != nil {fmt.Printf("read message from kafka failed: %v", err)break} else {offset := message.Offsetfmt.Printf("topic=%s, partition=%d, offset=%d, key=%s, message content=%s\n", message.Topic, message.Partition, offset, string(message.Key), string(message.Value))}}
}// 需要监听信息2和15,当收到信号时关闭reader
func listenSignal() {c := make(chan os.Signal, 1)signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) //注册信号2和15sig := <-c //阻塞,直到信号的到来fmt.Printf("receive signal %s\n", sig.String())if reader != nil {reader.Close()}os.Exit(0) //进程退出
}func main() {ctx := context.Background()// writeKafka(ctx)go listenSignal()readKafka(ctx)
}
相关文章:
kafka-go操作kafka
package mainimport ("context""fmt""os""os/signal""syscall""time""github.com/segmentio/kafka-go" )var (topic "user_click"reader *kafka.Reader )// 生产消息 func writeKafka(ctx …...
如何判断被DDoS攻击
当网络和设备正常的情况下,服务器突然出现连接断开、访问卡顿、用户掉线等情况;服务器CPU或内存占用率出现明显增长;网络出入流量出现明显增长;网站或应用程序突然出现大量的未知访问;登录服务器失败或者登录过慢等等。以上是最为常见的服务器被 DDoS攻击后出现的几…...
web —— html
Web —— css基础 1. HTML2. 基本HTML结构3. HTML常用标签3.1 文本相关标签3.2 HTML图像标签3.3 HTML超链接标签3.4 HTML表,单3.4.1 HTML表格3.4.2 HTML表单,输入框(多选框,单选框)下拉框 3.5 HTML分区标签3.5.1 div标…...
【C/PTA】数组练习(编程)
本文结合PTA专项练习带领读者掌握数组,刷题为主注释为辅,在代码中理解思路,其它不做过多叙述。 文章目录 7-1 计算最大值出现的次数7-2 求一批整数中出现最多的个位数字7-3 装箱问题7-4 数组-值钱的微信号7-5 数组-吹泡泡7-6 数组-数学鬼才 7…...
力扣:155. 最小栈(Python3)
题目: 设计一个支持 push ,pop ,top 操作,并能在常数时间内检索到最小元素的栈。 实现 MinStack 类: MinStack() 初始化堆栈对象。void push(int val) 将元素val推入堆栈。void pop() 删除堆栈顶部的元素。int top() 获取堆栈顶部…...
uniapp实现在线PDF文件预览
下载pdf文件放在static文件夹下 bug:hbuildX创建的项目pdf文件夹可以放在根目录下面,但是cli创建的项目无法预览,只能放在static下面 按钮跳转预览页面 <button click"toPdf">pdf</button>methods: {toPdf() {uni.…...
Python tkinter实现复刻Windows记事本UI和菜单的文本编辑器(一)
下一篇:Python tkinter实现复刻Windows记事本UI和菜单的文本编辑器(二)-CSDN博客 介绍: Windows操作系统中自带了一款记事本应用程序,通常用于记录文字信息,具有简单文本编辑功能。Windows的记事本可以新…...
【系统架构设计】架构核心知识: 3.3 DSSA和ABSD
目录 一 特定领域软件架构DSSA 1 DSSA 2 DSSA的基本活动和产物 3 参与DSSA的人员 4 建立过程...
Git的安装和常用命令Git与SVN的区别Gitee远程仓库团队开发代码共享演示
目录 一、Git入门 1.1 Git简介 1.2 Git与SVN的区别 1.2.1 详解 1.2.2 图解 1.3 Git相较于SVN的优势与劣势 1.3.1 Git的优势与劣势 1.3.2 SVN的优势与劣势 1.4 Git的工作流程 1.4.1 图解 1.4.2 详解 二、Git的安装以及常用命令 2.1 Git官网链接 2.2 安装步骤 2.…...
五、计算机网络
(一)OSI/RM 七层模型 七层模型是计算机网络的基石,整个计算机网络是构建与七层模型之上的。 在数据链路层,数据开始以帧为单位,网卡的 MAC 地址就是数据帧的地址,数据的传输开始有地址了。 局域网是工作…...
使用Grafana与MySQL监控监控网络延迟
文章目录 前言python程序使用Grafana步骤1:安装和配置 Grafana步骤2:配置 Grafana 数据源步骤3:创建 Grafana 仪表盘步骤4:将 Grafana 仪表盘嵌入到博客中 前言 在网络应用中,网络延迟是一个重要的指标,它…...
互联网常见职称
1、管理层 CEO – Chief Executive Officer 首席执行官 VP – Vice President 副总裁 HRD – Humen Resource Director 人力资源总监 OD – Operations Director 运营总监 MD – Marketing Director 市场总监 GM – General Manager 总经理 PM – Production Manager 产品…...
UI设计软件有哪些好用和免费的吗?
在我们分享五个有用的原型工具之前,完成原型,将优化界面,这次是UI设计师的任务,UI设计软件对设计师非常重要,UI设计工具是否使用直接影响最终结果,然后有人会问:UI界面设计使用什么软件…...
Linux开发工具之编译器gcc/g++
文章目录 1.查看版本2.程序的翻译3.gcc指令3.1gcc hello.c -o hello3.2gcc -E hello.c -o hello.i3.3gcc -S hello.c -o hello.s3.4gcc -c hello.c -o hello.o3.5gcc hello.o -o hello 4.动静态库[详讲链接阶段]4.1初步认识4.2动态链接4.3静态链接 1.查看版本 gcc -v. 2.程序…...
【Kurbernetes资源管理】陈述式资源管理方式
陈述式 一、 理论部分1.1 管理K8s资源的基本方法1.1.1 陈述式资源管理方式1.1.2声明式资源管理方式1.1.3 GUI式资源管理方法 1.2 陈述式资源管理方式1.2.1 Kubelet工具简介1.2.2 kubectl 的基本语法1.2.3 Kubectl工具的自动补全功能 1.3 Kubernetes Service1.4 Service 的类型(…...
flink测试map转换函数和process函数
背景 在flink中,我们需要对我们写的map转换函数,process处理函数进行单元测试,测试的内容包括查看函数的输出结果是否符合以及函数内的状态是否正确更新,本文就记录几个测试过程中的要点 flink中测试函数 首先我们根据我们要测…...
【跟小嘉学习JavaWeb开发】第一章 开发环境搭建
系列文章目录 【跟小嘉学习JavaWeb开发】第一章 开发环境搭建 文章目录 系列文章目录[TOC](文章目录) 前言一、JDK的下载与安装1.1、关于JDK的版本问题 二、环境变量配置2.1、配置 JAVA_HOME、CLASSPATH2.2、配置path2.3、启动 cmd 三、编写代码、编译并执行3.1、编写代码&…...
CSS语法、选择器、属性
1.css语法 * 格式:选择器 {属性名1:属性值1;属性名2:属性值2;...}* 选择器:筛选具有相似特征的元素* 注意:* 每一对属性需要使用;隔开,最后一对属性可以不加 2.选择器:筛选具有相似特征的元素 * 分类:1. 基…...
深度学习读取txt训练数据绘制参数曲线图的方法
有一些深度学习模型是并不像yolo系列那样最终输出相应的参数图,有很多训练形成了一个训练log文件,于是需要读取log文件中的内容并绘制成曲线图。 如下实例,有一个log文件的部分截图,需要将其读取出来并绘制曲线图 废话不多说&…...
VB.NET—DataGridView控件教程详解
目录 前言: 过程: 第一步: 第二步: 第三步: 第四步: 第五步: 番外篇: 总结: 前言: DataGridView是.NET FormK中的一个Windows窗体控件,它提供了一个可视化的表格控件,允许用户以表格形式显示和编辑数据。它通常用于显示和编辑数据库…...
观成科技:隐蔽隧道工具Ligolo-ng加密流量分析
1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具,该工具基于TUN接口实现其功能,利用反向TCP/TLS连接建立一条隐蔽的通信信道,支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式,适应复杂网…...
【入坑系列】TiDB 强制索引在不同库下不生效问题
文章目录 背景SQL 优化情况线上SQL运行情况分析怀疑1:执行计划绑定问题?尝试:SHOW WARNINGS 查看警告探索 TiDB 的 USE_INDEX 写法Hint 不生效问题排查解决参考背景 项目中使用 TiDB 数据库,并对 SQL 进行优化了,添加了强制索引。 UAT 环境已经生效,但 PROD 环境强制索…...
uni-app学习笔记二十二---使用vite.config.js全局导入常用依赖
在前面的练习中,每个页面需要使用ref,onShow等生命周期钩子函数时都需要像下面这样导入 import {onMounted, ref} from "vue" 如果不想每个页面都导入,需要使用node.js命令npm安装unplugin-auto-import npm install unplugin-au…...
智能在线客服平台:数字化时代企业连接用户的 AI 中枢
随着互联网技术的飞速发展,消费者期望能够随时随地与企业进行交流。在线客服平台作为连接企业与客户的重要桥梁,不仅优化了客户体验,还提升了企业的服务效率和市场竞争力。本文将探讨在线客服平台的重要性、技术进展、实际应用,并…...
1.3 VSCode安装与环境配置
进入网址Visual Studio Code - Code Editing. Redefined下载.deb文件,然后打开终端,进入下载文件夹,键入命令 sudo dpkg -i code_1.100.3-1748872405_amd64.deb 在终端键入命令code即启动vscode 需要安装插件列表 1.Chinese简化 2.ros …...
【git】把本地更改提交远程新分支feature_g
创建并切换新分支 git checkout -b feature_g 添加并提交更改 git add . git commit -m “实现图片上传功能” 推送到远程 git push -u origin feature_g...
Java面试专项一-准备篇
一、企业简历筛选规则 一般企业的简历筛选流程:首先由HR先筛选一部分简历后,在将简历给到对应的项目负责人后再进行下一步的操作。 HR如何筛选简历 例如:Boss直聘(招聘方平台) 直接按照条件进行筛选 例如:…...
【Oracle】分区表
个人主页:Guiat 归属专栏:Oracle 文章目录 1. 分区表基础概述1.1 分区表的概念与优势1.2 分区类型概览1.3 分区表的工作原理 2. 范围分区 (RANGE Partitioning)2.1 基础范围分区2.1.1 按日期范围分区2.1.2 按数值范围分区 2.2 间隔分区 (INTERVAL Partit…...
鸿蒙DevEco Studio HarmonyOS 5跑酷小游戏实现指南
1. 项目概述 本跑酷小游戏基于鸿蒙HarmonyOS 5开发,使用DevEco Studio作为开发工具,采用Java语言实现,包含角色控制、障碍物生成和分数计算系统。 2. 项目结构 /src/main/java/com/example/runner/├── MainAbilitySlice.java // 主界…...
深度学习习题2
1.如果增加神经网络的宽度,精确度会增加到一个特定阈值后,便开始降低。造成这一现象的可能原因是什么? A、即使增加卷积核的数量,只有少部分的核会被用作预测 B、当卷积核数量增加时,神经网络的预测能力会降低 C、当卷…...
