当前位置: 首页 > news >正文

go语言kafka入门

消息队列:一种基于异步通信的解耦机制,用于在应用程序或系统组件之间传递消息和数据

消息队列相关概念:

生产者(Producer):生成并发送消息到消息队列中的应用程序或系统组件。

消费者(Consumer):从消息队列中接收和处理消息的应用程序或系统组件。

主题(Topic):消息队列中用于分类和分组消息的逻辑概念,生产者将消息发送到指定的主题,而消费者可以订阅特定的主题以接收相应的消息。

队列(Queue):消息队列中存储消息的容器,遵循先进先出(FIFO)的原则。

发布-订阅模式(Publish-Subscribe Pattern):一种消息分发模式,生产者将消息发送到一个或多个主题,而消费者通过订阅感兴趣的主题来接收相应的消息。

点对点模式(Point-to-Point Pattern):一种消息传递模式,生产者将消息发送到特定的队列中,而消费者从队列中接收并处理消息。

消息序列化(Message Serialization):将消息从应用程序的数据结构转换为可以在消息队列中传输和存储的格式,通常使用如JSON、XML 或二进制等格式。

消息持久化(Message Persistence):将消息保存到持久化存储中,以确保即使在消息队列或应用程序重启之后也不会丢失

在这里插入图片描述

安装kafka-go

go get github.com/segmentio/kafka-go

简单示例

package mainimport ("context""fmt""log""sync""time""github.com/segmentio/kafka-go"
)// writeByConn 基于Conn发送消息
func writeByConn(wg *sync.WaitGroup) {defer wg.Done()topic := "my-topic"partition := 0// 连接至Kafka集群的Leader节点conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)if err != nil {log.Fatal("failed to dial leader:", err)}// 设置发送消息的超时时间conn.SetWriteDeadline(time.Now().Add(10 * time.Second))// 发送消息_, err = conn.WriteMessages(kafka.Message{Value: []byte("one!")},kafka.Message{Value: []byte("two!")},kafka.Message{Value: []byte("three!")},)if err != nil {log.Fatal("failed to write messages:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close writer:", err)}
}// readByConn 连接至kafka后接收消息
func readByConn(wg *sync.WaitGroup) {defer wg.Done()// 指定要连接的topic和partitiontopic := "my-topic"partition := 0// 连接至Kafka的leader节点conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)if err != nil {log.Fatal("failed to dial leader:", err)}// 设置读取超时时间conn.SetReadDeadline(time.Now().Add(10 * time.Second))// 读取一批消息,得到的batch是一系列消息的迭代器batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max// 遍历读取消息b := make([]byte, 10e3) // 10KB max per messagefor {n, err := batch.Read(b)if err != nil {break}fmt.Println(string(b[:n]))}// 关闭batchif err := batch.Close(); err != nil {log.Fatal("failed to close batch:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close connection:", err)}
}func main() {var wg sync.WaitGroupwg.Add(2)go writeByConn(&wg)go readByConn(&wg)wg.Wait()
}

运行结果

one!
two!
three!
one!
two!
three!
one!
two!
three!
one!
two!
three!
one!
two!
three!
2023/08/25 10:14:10 failed to close batch:[7] Request Timed Out: the request exceeded the user-specified time limit in the request

可以看到,利用两个goroutine,成功在一个文件里实现了生产者写入消息,消费者消费消息并打印出来

相关文章:

go语言kafka入门

消息队列:一种基于异步通信的解耦机制,用于在应用程序或系统组件之间传递消息和数据 消息队列相关概念: 生产者(Producer):生成并发送消息到消息队列中的应用程序或系统组件。 消费者(Consumer&…...

自定义拖拽功能,上下拖拽改变盒子高度

核心在于监听鼠标的move来改变div的高度&#xff0c;抽成了组件 <template><div ref"container" class"drag"><z-tooltip v-if"isShowIcon" effect"dark" content"格式化" placement"top-start"&…...

JavaScript Es6_4笔记

JavaScript 进阶 文章目录 JavaScript 进阶深浅拷贝浅拷贝深拷贝递归实现深拷贝js库lodash里面cloneDeep内部实现了深拷贝JSON序列化 异常处理throwtry ... catchdebugger 处理this普通函数箭头函数改变this指向callapplybind 防抖节流 深浅拷贝 浅拷贝 首先浅拷贝和深拷贝只…...

Python“牵手”易贝(Ebay)商品列表数据,关键词搜索ebayAPI接口数据,ebayAPI接口申请指南

Ebay平台API接口是为开发电商类应用程序而设计的一套完整的、跨浏览器、跨平台的接口规范&#xff0c; EbayAPI接口是指通过编程的方式&#xff0c;让开发者能够通过HTTP协议直接访问Ebay平台的数据&#xff0c;包括商品信息、店铺信息、物流信息等&#xff0c;从而实现Ebay平…...

C语言:选择+编程(每日一练Day8)

目录 选择题&#xff1a; 题一&#xff1a; 题二&#xff1a; 题三&#xff1a; 题四&#xff1a; 题五&#xff1a; 编程题&#xff1a; 题一&#xff1a;字符个数统计 思路一&#xff1a; 题二&#xff1a;多数元素 思路一&#xff1a; 本人实力有限可能对一些…...

使用 uniapp 适用于wx小程序 - 实现移动端头部的封装和调用

图例&#xff1a;红框区域&#xff0c;使其标题区与胶囊对齐 一、组件 navigation.vue <template><view class"nav_name"><view class"nav-title" :style"{color : props.color, padding-top : toprpx,background : props.bgColor,he…...

ARM Linux 系统稳定性分析入门及渐进 13 -- gdb 反汇编 disassemble 命令详细介绍及举例】

文章目录 1.1 gdb 调试回顾1.1.1 gdb list 命令介绍 1.2 反汇编命令 dis 介绍1.2.1 如何设置 gdb 汇编代码的格式 1.1 gdb 调试回顾 在GNU调试器&#xff08;GDB&#xff09;中&#xff0c;有许多命令可以帮助我们调试应用程序。 gdb: 这是一个强大的Unix下的程序调试工具。以…...

python连接Microsoft SQL Server 数据库

python代码 Author: tkhywang 2810248865qq.com Date: 2023-08-21 11:22:24 LastEditors: tkhywang 2810248865qq.com LastEditTime: 2023-08-21 11:29:30 FilePath: \PythonProject02\Microsoft SQL Server 数据库.py Description: 这是默认设置,请设置customMade, 打开koroFi…...

docker可视化工具

安装Portainer 官方安装说明&#xff1a;https://www.portainer.io/installation/ [rootubuntu1804 ~]#docker pull portainer/portainer[rootubuntu1804 ~]#docker volume create portainer_data portainer_data [rootubuntu1804 ~]#docker run -d -p 8000:8000 -p 9000:90…...

MySQL 用户管理操作

目录 一、用户管理概述 二、用户管理 1、创建用户 2、删除用户 三、账户密码管理 1、root用户修改自己的密码 2、ROOT用户修改其他普通用户密码 3、普通用户修改自己的密码 4、ROOT用户密码忘记解决办法 1&#xff09;Linux系统 2&#xff09;windows系统 四、用户权…...

【python办公自动化】PysimpleGUI中的popup弹窗中的按钮设置居中

PysimpleGUI中的popup弹窗中的按钮设置居中 背景问题解决背景 默认的popup弹窗中的OK按钮是在最下面偏左侧一些,有时需要将按钮放置居中 问题解决 首先找到pysimplegui源代码文件中popup的部分 然后定位到19388行,源文件内容如下 关于popup弹窗OK按钮的设置,将pad属性…...

postgresql 同步流复制两个相关参数synchronous_commit 和 synchronous_standby_names

一&#xff1a;synchronous_commit 1.synchronous_commit参数含义 这个参数用来设置事务提交返回客户端之前&#xff0c;一个事务是否需要等待 WAL 记录被写入磁盘。合法的值是{local,remote_write,remote_apply,on,off}。默认设置是on。 2.各可选值含义 2.1 local 当事务…...

运算放大器发展史

在内部集成了一个补偿电容 MPS公司OP07推出后&#xff0c;大受欢迎。各家厂商都推出了自己的 这4款都是可以替换的...

LVS+Keepalived 实验

Keepalived 是什么 Keepalived 是一个基于VRRP协议来实现的LVS服务高可用方案&#xff0c;可以解决静态路由出现的单点故障问题的一款检查工具 在一个LVS服务集群中通常有主服务器&#xff08;MASTER&#xff09;和备份服务器&#xff08;BACKUP&#xff09;两种角色的服务器…...

FreeSWITCH 1.10.10 简单图形化界面1 - docker/脚本/ISO镜像安装

FreeSWITCH 1.10.10 简单图形化界面1 - docker/脚本/ISO镜像安装 0. 界面预览1. Docker安装1.1 下载docker镜像1.2 启动docker镜像1.3 登录 2. 脚本安装2.1 下载2.2 安装2.3 登录2.4 卸载程序 3. 镜像安装3.1 下载镜像3.2 安装镜像3.3 登录 0. 界面预览 http://myfs.f3322.net…...

内网渗透神器CobaltStrike之权限提升(七)

Uac绕过 常见uac攻击模块 UAC-DLL UAC-DLL攻击模块允许攻击者从低权限的本地管理员账户获得更高的权限。这种攻击利用UAC的漏洞&#xff0c;将ArtifactKit生成的恶意DLL复制到需要特权的位置。 适用于Windows7和Windows8及更高版本的未修补版本 Uac-token-duplication 此攻…...

使用haproxy搭建web架构

haproxy HAProxy是一个免费的负载均衡软件&#xff0c;可以运行于大部分主流的Linux操作系统上。 HAProxy提供了可以在七层和四层两种负载均衡能力&#xff0c;它可以提供高可用性、负载均衡、及基于TCP和HTTP应用的代理。适用于负载大的Web站点&#xff0c;在运行在硬件上可…...

Java基础之IO流File类创建及删除

1.File类概述及构造方法 2.File类创建功能 文件创建成功&#xff01; 如果文件不存在&#xff0c;就创建文件&#xff0c;并返回true 如果文件存在&#xff0c;就不创建文件&#xff0c;并返回false 如果文件夹不存在&#xff0c;就创建文件夹&#xff0c;并返回true 如果文件…...

高速道路监控:工业路由器助力高速监控远程管理与维护

工业路由器在物联网应用中扮演着重要的角色。物联网的发展使得大量设备和传感器能够互联互通&#xff0c;而工业路由器作为连接这些设备和网络的中间桥梁&#xff0c;承担着数据传输和安全管理的重要责任。 工业路由器能够为高速监控提供网络功能&#xff0c;实现户外无线网络部…...

【校招VIP】前端基础之post和get

考点介绍&#xff1a; get和post 是网络基础&#xff0c;也是每个前端同学绕不过去的小问题&#xff0c;但是在校招面试中很多同学在基础回答中不到位&#xff0c;或者倒在引申问题里&#xff0c;就丢分了。 『前端基础之post和get』相关题目及解析内容可点击文章末尾链接查看…...

多云管理“拦路虎”:深入解析网络互联、身份同步与成本可视化的技术复杂度​

一、引言&#xff1a;多云环境的技术复杂性本质​​ 企业采用多云策略已从技术选型升维至生存刚需。当业务系统分散部署在多个云平台时&#xff0c;​​基础设施的技术债呈现指数级积累​​。网络连接、身份认证、成本管理这三大核心挑战相互嵌套&#xff1a;跨云网络构建数据…...

XCTF-web-easyupload

试了试php&#xff0c;php7&#xff0c;pht&#xff0c;phtml等&#xff0c;都没有用 尝试.user.ini 抓包修改将.user.ini修改为jpg图片 在上传一个123.jpg 用蚁剑连接&#xff0c;得到flag...

应用升级/灾备测试时使用guarantee 闪回点迅速回退

1.场景 应用要升级,当升级失败时,数据库回退到升级前. 要测试系统,测试完成后,数据库要回退到测试前。 相对于RMAN恢复需要很长时间&#xff0c; 数据库闪回只需要几分钟。 2.技术实现 数据库设置 2个db_recovery参数 创建guarantee闪回点&#xff0c;不需要开启数据库闪回。…...

微信小程序之bind和catch

这两个呢&#xff0c;都是绑定事件用的&#xff0c;具体使用有些小区别。 官方文档&#xff1a; 事件冒泡处理不同 bind&#xff1a;绑定的事件会向上冒泡&#xff0c;即触发当前组件的事件后&#xff0c;还会继续触发父组件的相同事件。例如&#xff0c;有一个子视图绑定了b…...

React第五十七节 Router中RouterProvider使用详解及注意事项

前言 在 React Router v6.4 中&#xff0c;RouterProvider 是一个核心组件&#xff0c;用于提供基于数据路由&#xff08;data routers&#xff09;的新型路由方案。 它替代了传统的 <BrowserRouter>&#xff0c;支持更强大的数据加载和操作功能&#xff08;如 loader 和…...

根据万维钢·精英日课6的内容,使用AI(2025)可以参考以下方法:

根据万维钢精英日课6的内容&#xff0c;使用AI&#xff08;2025&#xff09;可以参考以下方法&#xff1a; 四个洞见 模型已经比人聪明&#xff1a;以ChatGPT o3为代表的AI非常强大&#xff0c;能运用高级理论解释道理、引用最新学术论文&#xff0c;生成对顶尖科学家都有用的…...

Pinocchio 库详解及其在足式机器人上的应用

Pinocchio 库详解及其在足式机器人上的应用 Pinocchio (Pinocchio is not only a nose) 是一个开源的 C 库&#xff0c;专门用于快速计算机器人模型的正向运动学、逆向运动学、雅可比矩阵、动力学和动力学导数。它主要关注效率和准确性&#xff0c;并提供了一个通用的框架&…...

Java + Spring Boot + Mybatis 实现批量插入

在 Java 中使用 Spring Boot 和 MyBatis 实现批量插入可以通过以下步骤完成。这里提供两种常用方法&#xff1a;使用 MyBatis 的 <foreach> 标签和批处理模式&#xff08;ExecutorType.BATCH&#xff09;。 方法一&#xff1a;使用 XML 的 <foreach> 标签&#xff…...

QT3D学习笔记——圆台、圆锥

类名作用Qt3DWindow3D渲染窗口容器QEntity场景中的实体&#xff08;对象或容器&#xff09;QCamera控制观察视角QPointLight点光源QConeMesh圆锥几何网格QTransform控制实体的位置/旋转/缩放QPhongMaterialPhong光照材质&#xff08;定义颜色、反光等&#xff09;QFirstPersonC…...

2025年渗透测试面试题总结-腾讯[实习]科恩实验室-安全工程师(题目+回答)

安全领域各种资源&#xff0c;学习文档&#xff0c;以及工具分享、前沿信息分享、POC、EXP分享。不定期分享各种好玩的项目及好用的工具&#xff0c;欢迎关注。 目录 腾讯[实习]科恩实验室-安全工程师 一、网络与协议 1. TCP三次握手 2. SYN扫描原理 3. HTTPS证书机制 二…...