当前位置: 首页 > news >正文

go语言kafka入门

消息队列:一种基于异步通信的解耦机制,用于在应用程序或系统组件之间传递消息和数据

消息队列相关概念:

生产者(Producer):生成并发送消息到消息队列中的应用程序或系统组件。

消费者(Consumer):从消息队列中接收和处理消息的应用程序或系统组件。

主题(Topic):消息队列中用于分类和分组消息的逻辑概念,生产者将消息发送到指定的主题,而消费者可以订阅特定的主题以接收相应的消息。

队列(Queue):消息队列中存储消息的容器,遵循先进先出(FIFO)的原则。

发布-订阅模式(Publish-Subscribe Pattern):一种消息分发模式,生产者将消息发送到一个或多个主题,而消费者通过订阅感兴趣的主题来接收相应的消息。

点对点模式(Point-to-Point Pattern):一种消息传递模式,生产者将消息发送到特定的队列中,而消费者从队列中接收并处理消息。

消息序列化(Message Serialization):将消息从应用程序的数据结构转换为可以在消息队列中传输和存储的格式,通常使用如JSON、XML 或二进制等格式。

消息持久化(Message Persistence):将消息保存到持久化存储中,以确保即使在消息队列或应用程序重启之后也不会丢失

在这里插入图片描述

安装kafka-go

go get github.com/segmentio/kafka-go

简单示例

package mainimport ("context""fmt""log""sync""time""github.com/segmentio/kafka-go"
)// writeByConn 基于Conn发送消息
func writeByConn(wg *sync.WaitGroup) {defer wg.Done()topic := "my-topic"partition := 0// 连接至Kafka集群的Leader节点conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)if err != nil {log.Fatal("failed to dial leader:", err)}// 设置发送消息的超时时间conn.SetWriteDeadline(time.Now().Add(10 * time.Second))// 发送消息_, err = conn.WriteMessages(kafka.Message{Value: []byte("one!")},kafka.Message{Value: []byte("two!")},kafka.Message{Value: []byte("three!")},)if err != nil {log.Fatal("failed to write messages:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close writer:", err)}
}// readByConn 连接至kafka后接收消息
func readByConn(wg *sync.WaitGroup) {defer wg.Done()// 指定要连接的topic和partitiontopic := "my-topic"partition := 0// 连接至Kafka的leader节点conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)if err != nil {log.Fatal("failed to dial leader:", err)}// 设置读取超时时间conn.SetReadDeadline(time.Now().Add(10 * time.Second))// 读取一批消息,得到的batch是一系列消息的迭代器batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max// 遍历读取消息b := make([]byte, 10e3) // 10KB max per messagefor {n, err := batch.Read(b)if err != nil {break}fmt.Println(string(b[:n]))}// 关闭batchif err := batch.Close(); err != nil {log.Fatal("failed to close batch:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close connection:", err)}
}func main() {var wg sync.WaitGroupwg.Add(2)go writeByConn(&wg)go readByConn(&wg)wg.Wait()
}

运行结果

one!
two!
three!
one!
two!
three!
one!
two!
three!
one!
two!
three!
one!
two!
three!
2023/08/25 10:14:10 failed to close batch:[7] Request Timed Out: the request exceeded the user-specified time limit in the request

可以看到,利用两个goroutine,成功在一个文件里实现了生产者写入消息,消费者消费消息并打印出来

相关文章:

go语言kafka入门

消息队列:一种基于异步通信的解耦机制,用于在应用程序或系统组件之间传递消息和数据 消息队列相关概念: 生产者(Producer):生成并发送消息到消息队列中的应用程序或系统组件。 消费者(Consumer&…...

自定义拖拽功能,上下拖拽改变盒子高度

核心在于监听鼠标的move来改变div的高度&#xff0c;抽成了组件 <template><div ref"container" class"drag"><z-tooltip v-if"isShowIcon" effect"dark" content"格式化" placement"top-start"&…...

JavaScript Es6_4笔记

JavaScript 进阶 文章目录 JavaScript 进阶深浅拷贝浅拷贝深拷贝递归实现深拷贝js库lodash里面cloneDeep内部实现了深拷贝JSON序列化 异常处理throwtry ... catchdebugger 处理this普通函数箭头函数改变this指向callapplybind 防抖节流 深浅拷贝 浅拷贝 首先浅拷贝和深拷贝只…...

Python“牵手”易贝(Ebay)商品列表数据,关键词搜索ebayAPI接口数据,ebayAPI接口申请指南

Ebay平台API接口是为开发电商类应用程序而设计的一套完整的、跨浏览器、跨平台的接口规范&#xff0c; EbayAPI接口是指通过编程的方式&#xff0c;让开发者能够通过HTTP协议直接访问Ebay平台的数据&#xff0c;包括商品信息、店铺信息、物流信息等&#xff0c;从而实现Ebay平…...

C语言:选择+编程(每日一练Day8)

目录 选择题&#xff1a; 题一&#xff1a; 题二&#xff1a; 题三&#xff1a; 题四&#xff1a; 题五&#xff1a; 编程题&#xff1a; 题一&#xff1a;字符个数统计 思路一&#xff1a; 题二&#xff1a;多数元素 思路一&#xff1a; 本人实力有限可能对一些…...

使用 uniapp 适用于wx小程序 - 实现移动端头部的封装和调用

图例&#xff1a;红框区域&#xff0c;使其标题区与胶囊对齐 一、组件 navigation.vue <template><view class"nav_name"><view class"nav-title" :style"{color : props.color, padding-top : toprpx,background : props.bgColor,he…...

ARM Linux 系统稳定性分析入门及渐进 13 -- gdb 反汇编 disassemble 命令详细介绍及举例】

文章目录 1.1 gdb 调试回顾1.1.1 gdb list 命令介绍 1.2 反汇编命令 dis 介绍1.2.1 如何设置 gdb 汇编代码的格式 1.1 gdb 调试回顾 在GNU调试器&#xff08;GDB&#xff09;中&#xff0c;有许多命令可以帮助我们调试应用程序。 gdb: 这是一个强大的Unix下的程序调试工具。以…...

python连接Microsoft SQL Server 数据库

python代码 Author: tkhywang 2810248865qq.com Date: 2023-08-21 11:22:24 LastEditors: tkhywang 2810248865qq.com LastEditTime: 2023-08-21 11:29:30 FilePath: \PythonProject02\Microsoft SQL Server 数据库.py Description: 这是默认设置,请设置customMade, 打开koroFi…...

docker可视化工具

安装Portainer 官方安装说明&#xff1a;https://www.portainer.io/installation/ [rootubuntu1804 ~]#docker pull portainer/portainer[rootubuntu1804 ~]#docker volume create portainer_data portainer_data [rootubuntu1804 ~]#docker run -d -p 8000:8000 -p 9000:90…...

MySQL 用户管理操作

目录 一、用户管理概述 二、用户管理 1、创建用户 2、删除用户 三、账户密码管理 1、root用户修改自己的密码 2、ROOT用户修改其他普通用户密码 3、普通用户修改自己的密码 4、ROOT用户密码忘记解决办法 1&#xff09;Linux系统 2&#xff09;windows系统 四、用户权…...

【python办公自动化】PysimpleGUI中的popup弹窗中的按钮设置居中

PysimpleGUI中的popup弹窗中的按钮设置居中 背景问题解决背景 默认的popup弹窗中的OK按钮是在最下面偏左侧一些,有时需要将按钮放置居中 问题解决 首先找到pysimplegui源代码文件中popup的部分 然后定位到19388行,源文件内容如下 关于popup弹窗OK按钮的设置,将pad属性…...

postgresql 同步流复制两个相关参数synchronous_commit 和 synchronous_standby_names

一&#xff1a;synchronous_commit 1.synchronous_commit参数含义 这个参数用来设置事务提交返回客户端之前&#xff0c;一个事务是否需要等待 WAL 记录被写入磁盘。合法的值是{local,remote_write,remote_apply,on,off}。默认设置是on。 2.各可选值含义 2.1 local 当事务…...

运算放大器发展史

在内部集成了一个补偿电容 MPS公司OP07推出后&#xff0c;大受欢迎。各家厂商都推出了自己的 这4款都是可以替换的...

LVS+Keepalived 实验

Keepalived 是什么 Keepalived 是一个基于VRRP协议来实现的LVS服务高可用方案&#xff0c;可以解决静态路由出现的单点故障问题的一款检查工具 在一个LVS服务集群中通常有主服务器&#xff08;MASTER&#xff09;和备份服务器&#xff08;BACKUP&#xff09;两种角色的服务器…...

FreeSWITCH 1.10.10 简单图形化界面1 - docker/脚本/ISO镜像安装

FreeSWITCH 1.10.10 简单图形化界面1 - docker/脚本/ISO镜像安装 0. 界面预览1. Docker安装1.1 下载docker镜像1.2 启动docker镜像1.3 登录 2. 脚本安装2.1 下载2.2 安装2.3 登录2.4 卸载程序 3. 镜像安装3.1 下载镜像3.2 安装镜像3.3 登录 0. 界面预览 http://myfs.f3322.net…...

内网渗透神器CobaltStrike之权限提升(七)

Uac绕过 常见uac攻击模块 UAC-DLL UAC-DLL攻击模块允许攻击者从低权限的本地管理员账户获得更高的权限。这种攻击利用UAC的漏洞&#xff0c;将ArtifactKit生成的恶意DLL复制到需要特权的位置。 适用于Windows7和Windows8及更高版本的未修补版本 Uac-token-duplication 此攻…...

使用haproxy搭建web架构

haproxy HAProxy是一个免费的负载均衡软件&#xff0c;可以运行于大部分主流的Linux操作系统上。 HAProxy提供了可以在七层和四层两种负载均衡能力&#xff0c;它可以提供高可用性、负载均衡、及基于TCP和HTTP应用的代理。适用于负载大的Web站点&#xff0c;在运行在硬件上可…...

Java基础之IO流File类创建及删除

1.File类概述及构造方法 2.File类创建功能 文件创建成功&#xff01; 如果文件不存在&#xff0c;就创建文件&#xff0c;并返回true 如果文件存在&#xff0c;就不创建文件&#xff0c;并返回false 如果文件夹不存在&#xff0c;就创建文件夹&#xff0c;并返回true 如果文件…...

高速道路监控:工业路由器助力高速监控远程管理与维护

工业路由器在物联网应用中扮演着重要的角色。物联网的发展使得大量设备和传感器能够互联互通&#xff0c;而工业路由器作为连接这些设备和网络的中间桥梁&#xff0c;承担着数据传输和安全管理的重要责任。 工业路由器能够为高速监控提供网络功能&#xff0c;实现户外无线网络部…...

【校招VIP】前端基础之post和get

考点介绍&#xff1a; get和post 是网络基础&#xff0c;也是每个前端同学绕不过去的小问题&#xff0c;但是在校招面试中很多同学在基础回答中不到位&#xff0c;或者倒在引申问题里&#xff0c;就丢分了。 『前端基础之post和get』相关题目及解析内容可点击文章末尾链接查看…...

浅谈 React Hooks

React Hooks 是 React 16.8 引入的一组 API&#xff0c;用于在函数组件中使用 state 和其他 React 特性&#xff08;例如生命周期方法、context 等&#xff09;。Hooks 通过简洁的函数接口&#xff0c;解决了状态与 UI 的高度解耦&#xff0c;通过函数式编程范式实现更灵活 Rea…...

反向工程与模型迁移:打造未来商品详情API的可持续创新体系

在电商行业蓬勃发展的当下&#xff0c;商品详情API作为连接电商平台与开发者、商家及用户的关键纽带&#xff0c;其重要性日益凸显。传统商品详情API主要聚焦于商品基本信息&#xff08;如名称、价格、库存等&#xff09;的获取与展示&#xff0c;已难以满足市场对个性化、智能…...

Vue3 + Element Plus + TypeScript中el-transfer穿梭框组件使用详解及示例

使用详解 Element Plus 的 el-transfer 组件是一个强大的穿梭框组件&#xff0c;常用于在两个集合之间进行数据转移&#xff0c;如权限分配、数据选择等场景。下面我将详细介绍其用法并提供一个完整示例。 核心特性与用法 基本属性 v-model&#xff1a;绑定右侧列表的值&…...

【SQL学习笔记1】增删改查+多表连接全解析(内附SQL免费在线练习工具)

可以使用Sqliteviz这个网站免费编写sql语句&#xff0c;它能够让用户直接在浏览器内练习SQL的语法&#xff0c;不需要安装任何软件。 链接如下&#xff1a; sqliteviz 注意&#xff1a; 在转写SQL语法时&#xff0c;关键字之间有一个特定的顺序&#xff0c;这个顺序会影响到…...

页面渲染流程与性能优化

页面渲染流程与性能优化详解&#xff08;完整版&#xff09; 一、现代浏览器渲染流程&#xff08;详细说明&#xff09; 1. 构建DOM树 浏览器接收到HTML文档后&#xff0c;会逐步解析并构建DOM&#xff08;Document Object Model&#xff09;树。具体过程如下&#xff1a; (…...

【Zephyr 系列 10】实战项目:打造一个蓝牙传感器终端 + 网关系统(完整架构与全栈实现)

🧠关键词:Zephyr、BLE、终端、网关、广播、连接、传感器、数据采集、低功耗、系统集成 📌目标读者:希望基于 Zephyr 构建 BLE 系统架构、实现终端与网关协作、具备产品交付能力的开发者 📊篇幅字数:约 5200 字 ✨ 项目总览 在物联网实际项目中,**“终端 + 网关”**是…...

爬虫基础学习day2

# 爬虫设计领域 工商&#xff1a;企查查、天眼查短视频&#xff1a;抖音、快手、西瓜 ---> 飞瓜电商&#xff1a;京东、淘宝、聚美优品、亚马逊 ---> 分析店铺经营决策标题、排名航空&#xff1a;抓取所有航空公司价格 ---> 去哪儿自媒体&#xff1a;采集自媒体数据进…...

如何在网页里填写 PDF 表格?

有时候&#xff0c;你可能希望用户能在你的网站上填写 PDF 表单。然而&#xff0c;这件事并不简单&#xff0c;因为 PDF 并不是一种原生的网页格式。虽然浏览器可以显示 PDF 文件&#xff0c;但原生并不支持编辑或填写它们。更糟的是&#xff0c;如果你想收集表单数据&#xff…...

关键领域软件测试的突围之路:如何破解安全与效率的平衡难题

在数字化浪潮席卷全球的今天&#xff0c;软件系统已成为国家关键领域的核心战斗力。不同于普通商业软件&#xff0c;这些承载着国家安全使命的软件系统面临着前所未有的质量挑战——如何在确保绝对安全的前提下&#xff0c;实现高效测试与快速迭代&#xff1f;这一命题正考验着…...

Android第十三次面试总结(四大 组件基础)

Activity生命周期和四大启动模式详解 一、Activity 生命周期 Activity 的生命周期由一系列回调方法组成&#xff0c;用于管理其创建、可见性、焦点和销毁过程。以下是核心方法及其调用时机&#xff1a; ​onCreate()​​ ​调用时机​&#xff1a;Activity 首次创建时调用。​…...