当前位置: 首页 > news >正文

kafka-go操作kafka

package mainimport ("context""fmt""os""os/signal""syscall""time""github.com/segmentio/kafka-go"
)var (topic  = "user_click"reader *kafka.Reader
)// 生产消息
func writeKafka(ctx context.Context) {writer := &kafka.Writer{Addr:                   kafka.TCP("localhost:9092"), //不定长参数,支持传入多个broker的ip:portTopic:                  topic,                       //为所有message指定统一的topic。如果这里不指定统一的Topic,则创建kafka.Message{}时需要分别指定TopicBalancer:               &kafka.Hash{},               //把message的key进行hash,确定partitionWriteTimeout:           1 * time.Second,             //设定写超时RequiredAcks:           kafka.RequireNone,           //RequireNone不需要等待ack返回,效率最高,安全性最低;RequireOne只需要确保Leader写入成功就可以发送下一条消息;RequiredAcks需要确保Leader和所有Follower都写入成功才可以发送下一条消息。AllowAutoTopicCreation: true,                        //Topic不存在时自动创建。生产环境中一般设为false,由运维管理员创建Topic并配置partition数目}defer writer.Close() //记得关闭连接for i := 0; i < 3; i++ { //允许重试3次if err := writer.WriteMessages(ctx, //批量写入消息,原子操作,要么全写成功,要么全写失败kafka.Message{Key: []byte("1"), Value: []byte("A")},kafka.Message{Key: []byte("2"), Value: []byte("B")},kafka.Message{Key: []byte("3"), Value: []byte("C")},kafka.Message{Key: []byte("1"), Value: []byte("D")}, //key相同时肯定写入同一个partitionkafka.Message{Key: []byte("2"), Value: []byte("E")},); err != nil {if err == kafka.LeaderNotAvailable { //首次写一个新的Topic时,会发生LeaderNotAvailable错误,重试一次就好了time.Sleep(500 * time.Millisecond)continue} else {fmt.Printf("batch write message failed: %v", err)}} else {break //只要成功一次就不再尝试下一次了}}
}// 消费消息
func readKafka(ctx context.Context) {reader = kafka.NewReader(kafka.ReaderConfig{Brokers:        []string{"localhost:9092"}, //支持传入多个broker的ip:portTopic:          topic,CommitInterval: 1 * time.Second,   //每隔多长时间自动commit一次offset。即一边读一边向kafka上报读到了哪个位置。GroupID:        "recommend_biz",   //一个Group内消费到的消息不会重复StartOffset:    kafka.FirstOffset, //当一个特定的partition没有commited offset时(比如第一次读一个partition,之前没有commit过),通过StartOffset指定从第一个还是最后一个位置开始消费。StartOffset的取值要么是FirstOffset要么是LastOffset,LastOffset表示Consumer启动之前生成的老数据不管了。仅当指定了GroupID时,StartOffset才生效。})// defer reader.Close() //由于下面是死循环,正常情况下readKafka()函数永远不会结束,defer不会执行。所以需要监听信息2和15,当收到信号时关闭reader。需要把reader设为全局变量for { //消息队列里随时可能有新消息进来,所以这里是死循环,类似于读Channelif message, err := reader.ReadMessage(ctx); err != nil {fmt.Printf("read message from kafka failed: %v", err)break} else {offset := message.Offsetfmt.Printf("topic=%s, partition=%d, offset=%d, key=%s, message content=%s\n", message.Topic, message.Partition, offset, string(message.Key), string(message.Value))}}
}// 需要监听信息2和15,当收到信号时关闭reader
func listenSignal() {c := make(chan os.Signal, 1)signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) //注册信号2和15sig := <-c                                        //阻塞,直到信号的到来fmt.Printf("receive signal %s\n", sig.String())if reader != nil {reader.Close()}os.Exit(0) //进程退出
}func main() {ctx := context.Background()// writeKafka(ctx)go listenSignal()readKafka(ctx)
} 

相关文章:

kafka-go操作kafka

package mainimport ("context""fmt""os""os/signal""syscall""time""github.com/segmentio/kafka-go" )var (topic "user_click"reader *kafka.Reader )// 生产消息 func writeKafka(ctx …...

如何判断被DDoS攻击

当网络和设备正常的情况下&#xff0c;服务器突然出现连接断开、访问卡顿、用户掉线等情况;服务器CPU或内存占用率出现明显增长;网络出入流量出现明显增长;网站或应用程序突然出现大量的未知访问;登录服务器失败或者登录过慢等等。以上是最为常见的服务器被 DDoS攻击后出现的几…...

web —— html

Web —— css基础 1. HTML2. 基本HTML结构3. HTML常用标签3.1 文本相关标签3.2 HTML图像标签3.3 HTML超链接标签3.4 HTML表&#xff0c;单3.4.1 HTML表格3.4.2 HTML表单&#xff0c;输入框&#xff08;多选框&#xff0c;单选框&#xff09;下拉框 3.5 HTML分区标签3.5.1 div标…...

【C/PTA】数组练习(编程)

本文结合PTA专项练习带领读者掌握数组&#xff0c;刷题为主注释为辅&#xff0c;在代码中理解思路&#xff0c;其它不做过多叙述。 文章目录 7-1 计算最大值出现的次数7-2 求一批整数中出现最多的个位数字7-3 装箱问题7-4 数组-值钱的微信号7-5 数组-吹泡泡7-6 数组-数学鬼才 7…...

力扣:155. 最小栈(Python3)

题目&#xff1a; 设计一个支持 push &#xff0c;pop &#xff0c;top 操作&#xff0c;并能在常数时间内检索到最小元素的栈。 实现 MinStack 类: MinStack() 初始化堆栈对象。void push(int val) 将元素val推入堆栈。void pop() 删除堆栈顶部的元素。int top() 获取堆栈顶部…...

uniapp实现在线PDF文件预览

下载pdf文件放在static文件夹下 bug&#xff1a;hbuildX创建的项目pdf文件夹可以放在根目录下面&#xff0c;但是cli创建的项目无法预览&#xff0c;只能放在static下面 按钮跳转预览页面 <button click"toPdf">pdf</button>methods: {toPdf() {uni.…...

Python tkinter实现复刻Windows记事本UI和菜单的文本编辑器(一)

下一篇&#xff1a;Python tkinter实现复刻Windows记事本UI和菜单的文本编辑器&#xff08;二&#xff09;-CSDN博客 介绍&#xff1a; Windows操作系统中自带了一款记事本应用程序&#xff0c;通常用于记录文字信息&#xff0c;具有简单文本编辑功能。Windows的记事本可以新…...

【系统架构设计】架构核心知识: 3.3 DSSA和ABSD

目录 一 特定领域软件架构DSSA 1 DSSA 2 DSSA的基本活动和产物 3 参与DSSA的人员 4 建立过程...

Git的安装和常用命令Git与SVN的区别Gitee远程仓库团队开发代码共享演示

目录 一、Git入门 1.1 Git简介 1.2 Git与SVN的区别 1.2.1 详解 1.2.2 图解 1.3 Git相较于SVN的优势与劣势 1.3.1 Git的优势与劣势 1.3.2 SVN的优势与劣势 1.4 Git的工作流程 1.4.1 图解 1.4.2 详解 二、Git的安装以及常用命令 2.1 Git官网链接 2.2 安装步骤 2.…...

五、计算机网络

&#xff08;一&#xff09;OSI/RM 七层模型 七层模型是计算机网络的基石&#xff0c;整个计算机网络是构建与七层模型之上的。 在数据链路层&#xff0c;数据开始以帧为单位&#xff0c;网卡的 MAC 地址就是数据帧的地址&#xff0c;数据的传输开始有地址了。 局域网是工作…...

使用Grafana与MySQL监控监控网络延迟

文章目录 前言python程序使用Grafana步骤1&#xff1a;安装和配置 Grafana步骤2&#xff1a;配置 Grafana 数据源步骤3&#xff1a;创建 Grafana 仪表盘步骤4&#xff1a;将 Grafana 仪表盘嵌入到博客中 前言 在网络应用中&#xff0c;网络延迟是一个重要的指标&#xff0c;它…...

互联网常见职称

1、管理层 CEO – Chief Executive Officer 首席执行官 VP – Vice President 副总裁 HRD – Humen Resource Director 人力资源总监 OD – Operations Director 运营总监 MD – Marketing Director 市场总监 GM – General Manager 总经理 PM – Production Manager 产品…...

UI设计软件有哪些好用和免费的吗?

在我们分享五个有用的原型工具之前&#xff0c;完成原型&#xff0c;将优化界面&#xff0c;这次是UI设计师的任务&#xff0c;UI设计软件对设计师非常重要&#xff0c;UI设计工具是否使用直接影响最终结果&#xff0c;然后有人会问&#xff1a;UI界面设计使用什么软件&#xf…...

Linux开发工具之编译器gcc/g++

文章目录 1.查看版本2.程序的翻译3.gcc指令3.1gcc hello.c -o hello3.2gcc -E hello.c -o hello.i3.3gcc -S hello.c -o hello.s3.4gcc -c hello.c -o hello.o3.5gcc hello.o -o hello 4.动静态库[详讲链接阶段]4.1初步认识4.2动态链接4.3静态链接 1.查看版本 gcc -v. 2.程序…...

【Kurbernetes资源管理】陈述式资源管理方式

陈述式 一、 理论部分1.1 管理K8s资源的基本方法1.1.1 陈述式资源管理方式1.1.2声明式资源管理方式1.1.3 GUI式资源管理方法 1.2 陈述式资源管理方式1.2.1 Kubelet工具简介1.2.2 kubectl 的基本语法1.2.3 Kubectl工具的自动补全功能 1.3 Kubernetes Service1.4 Service 的类型(…...

flink测试map转换函数和process函数

背景 在flink中&#xff0c;我们需要对我们写的map转换函数&#xff0c;process处理函数进行单元测试&#xff0c;测试的内容包括查看函数的输出结果是否符合以及函数内的状态是否正确更新&#xff0c;本文就记录几个测试过程中的要点 flink中测试函数 首先我们根据我们要测…...

【跟小嘉学习JavaWeb开发】第一章 开发环境搭建

系列文章目录 【跟小嘉学习JavaWeb开发】第一章 开发环境搭建 文章目录 系列文章目录[TOC](文章目录) 前言一、JDK的下载与安装1.1、关于JDK的版本问题 二、环境变量配置2.1、配置 JAVA_HOME、CLASSPATH2.2、配置path2.3、启动 cmd 三、编写代码、编译并执行3.1、编写代码&…...

CSS语法、选择器、属性

1.css语法 * 格式&#xff1a;选择器 {属性名1:属性值1;属性名2:属性值2;...}* 选择器:筛选具有相似特征的元素* 注意&#xff1a;* 每一对属性需要使用&#xff1b;隔开&#xff0c;最后一对属性可以不加 2.选择器&#xff1a;筛选具有相似特征的元素 * 分类&#xff1a;1. 基…...

深度学习读取txt训练数据绘制参数曲线图的方法

有一些深度学习模型是并不像yolo系列那样最终输出相应的参数图&#xff0c;有很多训练形成了一个训练log文件&#xff0c;于是需要读取log文件中的内容并绘制成曲线图。 如下实例&#xff0c;有一个log文件的部分截图&#xff0c;需要将其读取出来并绘制曲线图 废话不多说&…...

VB.NET—DataGridView控件教程详解

目录 前言: 过程: 第一步: 第二步: 第三步: 第四步: 第五步&#xff1a; 番外篇: 总结: 前言: DataGridView是.NET FormK中的一个Windows窗体控件&#xff0c;它提供了一个可视化的表格控件&#xff0c;允许用户以表格形式显示和编辑数据。它通常用于显示和编辑数据库…...

Caliper 配置文件解析:config.yaml 和 fisco-bcos.json 附加在caliper中执行不同的合约方法

Caliper 配置文件解析:config.yaml 和 fisco-bcos.json Caliper 是一个区块链性能基准测试工具,用于评估不同区块链平台的性能。下面我将详细解释你提供的 fisco-bcos.json 文件结构,并说明它与 config.yaml 文件的关系。 fisco-bcos.json 文件解析 这个文件是针对 FISCO…...

git删除本地分支和远程分支

删除本地分支 git branch -d 分支名删除远程分支 git push origin --delete 分支名...

《开篇:课程目录》

大家好&#xff01;我是一名.NET技术开发者&#xff0c;长期以来积累了比较多的项目实战经验&#xff0c;现在把它分享给大家&#xff0c;希望能够帮助到大家&#xff0c;同时为.NET社区提供一份力量&#xff0c;让更多的开发者参与进来。 要讲解的课程如下&#xff1a; 《介绍…...

深入理解卷积神经网络:从原理到应用

在人工智能领域&#xff0c;卷积神经网络&#xff08;Convolutional Neural Network, CNN&#xff09;无疑是计算机视觉领域的璀璨明珠。从 1998 年 Yann LeCun 提出 LeNet-5 实现手写数字识别&#xff0c;到 2012 年 AlexNet 在 ImageNet 大赛上创造历史性突破&#xff0c;CNN…...

javaweb -html -CSS

HTML是一种超文本标记语言 超文本&#xff1a;超过了文本的限制&#xff0c;比普通文本更强大&#xff0c;除了文字信息&#xff0c;还可以定义图片、音频、视频等内容。 标记语言&#xff1a;由标签"<标签名>"构成的语言。 CSS:层叠样式表&#xff0c;用于…...

Ubuntu挂载本地镜像源(像CentOS 一样挂载本地镜像源)

1.挂载 ISO 镜像 sudo mount -o loop /ubuntu-22.04.5-desktop-amd64.iso /mnt/iso 2.备份现有的软件源配置文件&#xff1a; sudo cp /etc/apt/sources.list /etc/apt/sources.list.bak 3.编辑软件源配置文件 编辑 /etc/apt/sources.list sudo nano /etc/apt/sources.l…...

Linux68 FTP 测试 上传下载

6.在vi编辑器里&#xff0c;哪个命令能将光标移到第200行&#xff1f;&#xff08; B &#xff09; 7.A、200g B、:200 C、g200 D、G200 假如您需要找出 /etc/my.conf 文件属于哪个包 (package) &#xff0c;您可以执行&#xff08; D &#xff09;C A、 rpm -q /etc/my.co…...

Linux系统编程-DAY10(TCP操作)

一、网络模型 1、服务器/客户端模型 &#xff08;1&#xff09;C/S&#xff1a;client server &#xff08;2&#xff09;B/S&#xff1a;browser server &#xff08;3&#xff09;P2P&#xff1a;peer to peer 2、C/S与B/S区别 &#xff08;1&#xff09;客户端不同&#…...

用电脑通过网口控制keysight示波器

KEYSIGHT示波器HD304MSO性能 亮点: 体验 200 MHz 至 1 GHz 的带宽和 4 个模拟通道。与 12 位 ADC 相比,使用 14 位模数转换器 (ADC) 将垂直分辨率提高四倍。使用 10.1 英寸电容式触摸屏轻松查看和分析您的信号。捕获 50 μVRMS 本底噪声的较小信号。使用独有区域触摸在几秒…...

《如何使用MinGW-w64编译OpenCV和opencv_contrib》

《如何使用MinGW-w64编译OpenCV和opencv_contrib》 在Windows环境下使用MinGW编译OpenCV和opencv_contrib是一个常见需求,尤其是对于那些希望使用GCC工具链而非Visual Studio的开发者。下面我将详细介绍这个过程。 准备工作 首先需要安装和准备以下工具和库: MinGW(建议使…...