当前位置: 首页 > news >正文

kafka-go操作kafka

package mainimport ("context""fmt""os""os/signal""syscall""time""github.com/segmentio/kafka-go"
)var (topic  = "user_click"reader *kafka.Reader
)// 生产消息
func writeKafka(ctx context.Context) {writer := &kafka.Writer{Addr:                   kafka.TCP("localhost:9092"), //不定长参数,支持传入多个broker的ip:portTopic:                  topic,                       //为所有message指定统一的topic。如果这里不指定统一的Topic,则创建kafka.Message{}时需要分别指定TopicBalancer:               &kafka.Hash{},               //把message的key进行hash,确定partitionWriteTimeout:           1 * time.Second,             //设定写超时RequiredAcks:           kafka.RequireNone,           //RequireNone不需要等待ack返回,效率最高,安全性最低;RequireOne只需要确保Leader写入成功就可以发送下一条消息;RequiredAcks需要确保Leader和所有Follower都写入成功才可以发送下一条消息。AllowAutoTopicCreation: true,                        //Topic不存在时自动创建。生产环境中一般设为false,由运维管理员创建Topic并配置partition数目}defer writer.Close() //记得关闭连接for i := 0; i < 3; i++ { //允许重试3次if err := writer.WriteMessages(ctx, //批量写入消息,原子操作,要么全写成功,要么全写失败kafka.Message{Key: []byte("1"), Value: []byte("A")},kafka.Message{Key: []byte("2"), Value: []byte("B")},kafka.Message{Key: []byte("3"), Value: []byte("C")},kafka.Message{Key: []byte("1"), Value: []byte("D")}, //key相同时肯定写入同一个partitionkafka.Message{Key: []byte("2"), Value: []byte("E")},); err != nil {if err == kafka.LeaderNotAvailable { //首次写一个新的Topic时,会发生LeaderNotAvailable错误,重试一次就好了time.Sleep(500 * time.Millisecond)continue} else {fmt.Printf("batch write message failed: %v", err)}} else {break //只要成功一次就不再尝试下一次了}}
}// 消费消息
func readKafka(ctx context.Context) {reader = kafka.NewReader(kafka.ReaderConfig{Brokers:        []string{"localhost:9092"}, //支持传入多个broker的ip:portTopic:          topic,CommitInterval: 1 * time.Second,   //每隔多长时间自动commit一次offset。即一边读一边向kafka上报读到了哪个位置。GroupID:        "recommend_biz",   //一个Group内消费到的消息不会重复StartOffset:    kafka.FirstOffset, //当一个特定的partition没有commited offset时(比如第一次读一个partition,之前没有commit过),通过StartOffset指定从第一个还是最后一个位置开始消费。StartOffset的取值要么是FirstOffset要么是LastOffset,LastOffset表示Consumer启动之前生成的老数据不管了。仅当指定了GroupID时,StartOffset才生效。})// defer reader.Close() //由于下面是死循环,正常情况下readKafka()函数永远不会结束,defer不会执行。所以需要监听信息2和15,当收到信号时关闭reader。需要把reader设为全局变量for { //消息队列里随时可能有新消息进来,所以这里是死循环,类似于读Channelif message, err := reader.ReadMessage(ctx); err != nil {fmt.Printf("read message from kafka failed: %v", err)break} else {offset := message.Offsetfmt.Printf("topic=%s, partition=%d, offset=%d, key=%s, message content=%s\n", message.Topic, message.Partition, offset, string(message.Key), string(message.Value))}}
}// 需要监听信息2和15,当收到信号时关闭reader
func listenSignal() {c := make(chan os.Signal, 1)signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) //注册信号2和15sig := <-c                                        //阻塞,直到信号的到来fmt.Printf("receive signal %s\n", sig.String())if reader != nil {reader.Close()}os.Exit(0) //进程退出
}func main() {ctx := context.Background()// writeKafka(ctx)go listenSignal()readKafka(ctx)
} 

相关文章:

kafka-go操作kafka

package mainimport ("context""fmt""os""os/signal""syscall""time""github.com/segmentio/kafka-go" )var (topic "user_click"reader *kafka.Reader )// 生产消息 func writeKafka(ctx …...

如何判断被DDoS攻击

当网络和设备正常的情况下&#xff0c;服务器突然出现连接断开、访问卡顿、用户掉线等情况;服务器CPU或内存占用率出现明显增长;网络出入流量出现明显增长;网站或应用程序突然出现大量的未知访问;登录服务器失败或者登录过慢等等。以上是最为常见的服务器被 DDoS攻击后出现的几…...

web —— html

Web —— css基础 1. HTML2. 基本HTML结构3. HTML常用标签3.1 文本相关标签3.2 HTML图像标签3.3 HTML超链接标签3.4 HTML表&#xff0c;单3.4.1 HTML表格3.4.2 HTML表单&#xff0c;输入框&#xff08;多选框&#xff0c;单选框&#xff09;下拉框 3.5 HTML分区标签3.5.1 div标…...

【C/PTA】数组练习(编程)

本文结合PTA专项练习带领读者掌握数组&#xff0c;刷题为主注释为辅&#xff0c;在代码中理解思路&#xff0c;其它不做过多叙述。 文章目录 7-1 计算最大值出现的次数7-2 求一批整数中出现最多的个位数字7-3 装箱问题7-4 数组-值钱的微信号7-5 数组-吹泡泡7-6 数组-数学鬼才 7…...

力扣:155. 最小栈(Python3)

题目&#xff1a; 设计一个支持 push &#xff0c;pop &#xff0c;top 操作&#xff0c;并能在常数时间内检索到最小元素的栈。 实现 MinStack 类: MinStack() 初始化堆栈对象。void push(int val) 将元素val推入堆栈。void pop() 删除堆栈顶部的元素。int top() 获取堆栈顶部…...

uniapp实现在线PDF文件预览

下载pdf文件放在static文件夹下 bug&#xff1a;hbuildX创建的项目pdf文件夹可以放在根目录下面&#xff0c;但是cli创建的项目无法预览&#xff0c;只能放在static下面 按钮跳转预览页面 <button click"toPdf">pdf</button>methods: {toPdf() {uni.…...

Python tkinter实现复刻Windows记事本UI和菜单的文本编辑器(一)

下一篇&#xff1a;Python tkinter实现复刻Windows记事本UI和菜单的文本编辑器&#xff08;二&#xff09;-CSDN博客 介绍&#xff1a; Windows操作系统中自带了一款记事本应用程序&#xff0c;通常用于记录文字信息&#xff0c;具有简单文本编辑功能。Windows的记事本可以新…...

【系统架构设计】架构核心知识: 3.3 DSSA和ABSD

目录 一 特定领域软件架构DSSA 1 DSSA 2 DSSA的基本活动和产物 3 参与DSSA的人员 4 建立过程...

Git的安装和常用命令Git与SVN的区别Gitee远程仓库团队开发代码共享演示

目录 一、Git入门 1.1 Git简介 1.2 Git与SVN的区别 1.2.1 详解 1.2.2 图解 1.3 Git相较于SVN的优势与劣势 1.3.1 Git的优势与劣势 1.3.2 SVN的优势与劣势 1.4 Git的工作流程 1.4.1 图解 1.4.2 详解 二、Git的安装以及常用命令 2.1 Git官网链接 2.2 安装步骤 2.…...

五、计算机网络

&#xff08;一&#xff09;OSI/RM 七层模型 七层模型是计算机网络的基石&#xff0c;整个计算机网络是构建与七层模型之上的。 在数据链路层&#xff0c;数据开始以帧为单位&#xff0c;网卡的 MAC 地址就是数据帧的地址&#xff0c;数据的传输开始有地址了。 局域网是工作…...

使用Grafana与MySQL监控监控网络延迟

文章目录 前言python程序使用Grafana步骤1&#xff1a;安装和配置 Grafana步骤2&#xff1a;配置 Grafana 数据源步骤3&#xff1a;创建 Grafana 仪表盘步骤4&#xff1a;将 Grafana 仪表盘嵌入到博客中 前言 在网络应用中&#xff0c;网络延迟是一个重要的指标&#xff0c;它…...

互联网常见职称

1、管理层 CEO – Chief Executive Officer 首席执行官 VP – Vice President 副总裁 HRD – Humen Resource Director 人力资源总监 OD – Operations Director 运营总监 MD – Marketing Director 市场总监 GM – General Manager 总经理 PM – Production Manager 产品…...

UI设计软件有哪些好用和免费的吗?

在我们分享五个有用的原型工具之前&#xff0c;完成原型&#xff0c;将优化界面&#xff0c;这次是UI设计师的任务&#xff0c;UI设计软件对设计师非常重要&#xff0c;UI设计工具是否使用直接影响最终结果&#xff0c;然后有人会问&#xff1a;UI界面设计使用什么软件&#xf…...

Linux开发工具之编译器gcc/g++

文章目录 1.查看版本2.程序的翻译3.gcc指令3.1gcc hello.c -o hello3.2gcc -E hello.c -o hello.i3.3gcc -S hello.c -o hello.s3.4gcc -c hello.c -o hello.o3.5gcc hello.o -o hello 4.动静态库[详讲链接阶段]4.1初步认识4.2动态链接4.3静态链接 1.查看版本 gcc -v. 2.程序…...

【Kurbernetes资源管理】陈述式资源管理方式

陈述式 一、 理论部分1.1 管理K8s资源的基本方法1.1.1 陈述式资源管理方式1.1.2声明式资源管理方式1.1.3 GUI式资源管理方法 1.2 陈述式资源管理方式1.2.1 Kubelet工具简介1.2.2 kubectl 的基本语法1.2.3 Kubectl工具的自动补全功能 1.3 Kubernetes Service1.4 Service 的类型(…...

flink测试map转换函数和process函数

背景 在flink中&#xff0c;我们需要对我们写的map转换函数&#xff0c;process处理函数进行单元测试&#xff0c;测试的内容包括查看函数的输出结果是否符合以及函数内的状态是否正确更新&#xff0c;本文就记录几个测试过程中的要点 flink中测试函数 首先我们根据我们要测…...

【跟小嘉学习JavaWeb开发】第一章 开发环境搭建

系列文章目录 【跟小嘉学习JavaWeb开发】第一章 开发环境搭建 文章目录 系列文章目录[TOC](文章目录) 前言一、JDK的下载与安装1.1、关于JDK的版本问题 二、环境变量配置2.1、配置 JAVA_HOME、CLASSPATH2.2、配置path2.3、启动 cmd 三、编写代码、编译并执行3.1、编写代码&…...

CSS语法、选择器、属性

1.css语法 * 格式&#xff1a;选择器 {属性名1:属性值1;属性名2:属性值2;...}* 选择器:筛选具有相似特征的元素* 注意&#xff1a;* 每一对属性需要使用&#xff1b;隔开&#xff0c;最后一对属性可以不加 2.选择器&#xff1a;筛选具有相似特征的元素 * 分类&#xff1a;1. 基…...

深度学习读取txt训练数据绘制参数曲线图的方法

有一些深度学习模型是并不像yolo系列那样最终输出相应的参数图&#xff0c;有很多训练形成了一个训练log文件&#xff0c;于是需要读取log文件中的内容并绘制成曲线图。 如下实例&#xff0c;有一个log文件的部分截图&#xff0c;需要将其读取出来并绘制曲线图 废话不多说&…...

VB.NET—DataGridView控件教程详解

目录 前言: 过程: 第一步: 第二步: 第三步: 第四步: 第五步&#xff1a; 番外篇: 总结: 前言: DataGridView是.NET FormK中的一个Windows窗体控件&#xff0c;它提供了一个可视化的表格控件&#xff0c;允许用户以表格形式显示和编辑数据。它通常用于显示和编辑数据库…...

龙虎榜——20250610

上证指数放量收阴线&#xff0c;个股多数下跌&#xff0c;盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型&#xff0c;指数短线有调整的需求&#xff0c;大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的&#xff1a;御银股份、雄帝科技 驱动…...

【CSS position 属性】static、relative、fixed、absolute 、sticky详细介绍,多层嵌套定位示例

文章目录 ★ position 的五种类型及基本用法 ★ 一、position 属性概述 二、position 的五种类型详解(初学者版) 1. static(默认值) 2. relative(相对定位) 3. absolute(绝对定位) 4. fixed(固定定位) 5. sticky(粘性定位) 三、定位元素的层级关系(z-i…...

【git】把本地更改提交远程新分支feature_g

创建并切换新分支 git checkout -b feature_g 添加并提交更改 git add . git commit -m “实现图片上传功能” 推送到远程 git push -u origin feature_g...

JavaScript基础-API 和 Web API

在学习JavaScript的过程中&#xff0c;理解API&#xff08;应用程序接口&#xff09;和Web API的概念及其应用是非常重要的。这些工具极大地扩展了JavaScript的功能&#xff0c;使得开发者能够创建出功能丰富、交互性强的Web应用程序。本文将深入探讨JavaScript中的API与Web AP…...

MySQL JOIN 表过多的优化思路

当 MySQL 查询涉及大量表 JOIN 时&#xff0c;性能会显著下降。以下是优化思路和简易实现方法&#xff1a; 一、核心优化思路 减少 JOIN 数量 数据冗余&#xff1a;添加必要的冗余字段&#xff08;如订单表直接存储用户名&#xff09;合并表&#xff1a;将频繁关联的小表合并成…...

起重机起升机构的安全装置有哪些?

起重机起升机构的安全装置是保障吊装作业安全的关键部件&#xff0c;主要用于防止超载、失控、断绳等危险情况。以下是常见的安全装置及其功能和原理&#xff1a; 一、超载保护装置&#xff08;核心安全装置&#xff09; 1. 起重量限制器 功能&#xff1a;实时监测起升载荷&a…...

【大厂机试题解法笔记】矩阵匹配

题目 从一个 N * M&#xff08;N ≤ M&#xff09;的矩阵中选出 N 个数&#xff0c;任意两个数字不能在同一行或同一列&#xff0c;求选出来的 N 个数中第 K 大的数字的最小值是多少。 输入描述 输入矩阵要求&#xff1a;1 ≤ K ≤ N ≤ M ≤ 150 输入格式 N M K N*M矩阵 输…...

【Vue】scoped+组件通信+props校验

【scoped作用及原理】 【作用】 默认写在组件中style的样式会全局生效, 因此很容易造成多个组件之间的样式冲突问题 故而可以给组件加上scoped 属性&#xff0c; 令样式只作用于当前组件的标签 作用&#xff1a;防止不同vue组件样式污染 【原理】 给组件加上scoped 属性后…...

rk3506上移植lvgl应用

本文档介绍如何在开发板上运行以及移植LVGL。 1. 移植准备 硬件环境:开发板及其配套屏幕 开发板镜像 主机环境:Ubuntu 22.04.5 2. LVGL启动 ​ 出厂系统默认配置了 LVGL,并且上电之后默认会启动 一个LVGL应用 。 LVGL 的启动脚本为/etc/init.d/pre_init/S00-lv_demo,…...

【QT】qtdesigner中将控件提升为自定义控件后,css设置样式不生效(已解决,图文详情)

目录 0.背景 1.解决思路 2.详细代码 0.背景 实际项目中遇到的问题&#xff0c;描述如下&#xff1a; 我在qtdesigner用界面拖了一个QTableView控件&#xff0c;object name为【tableView_electrode】&#xff0c;然后【提升为】了自定义的类【Steer_Electrode_Table】&…...