go语言kafka入门
消息队列:一种基于异步通信的解耦机制,用于在应用程序或系统组件之间传递消息和数据
消息队列相关概念:
生产者(Producer):生成并发送消息到消息队列中的应用程序或系统组件。
消费者(Consumer):从消息队列中接收和处理消息的应用程序或系统组件。
主题(Topic):消息队列中用于分类和分组消息的逻辑概念,生产者将消息发送到指定的主题,而消费者可以订阅特定的主题以接收相应的消息。
队列(Queue):消息队列中存储消息的容器,遵循先进先出(FIFO)的原则。
发布-订阅模式(Publish-Subscribe Pattern):一种消息分发模式,生产者将消息发送到一个或多个主题,而消费者通过订阅感兴趣的主题来接收相应的消息。
点对点模式(Point-to-Point Pattern):一种消息传递模式,生产者将消息发送到特定的队列中,而消费者从队列中接收并处理消息。
消息序列化(Message Serialization):将消息从应用程序的数据结构转换为可以在消息队列中传输和存储的格式,通常使用如JSON、XML 或二进制等格式。
消息持久化(Message Persistence):将消息保存到持久化存储中,以确保即使在消息队列或应用程序重启之后也不会丢失

安装kafka-go
go get github.com/segmentio/kafka-go
简单示例
package mainimport ("context""fmt""log""sync""time""github.com/segmentio/kafka-go"
)// writeByConn 基于Conn发送消息
func writeByConn(wg *sync.WaitGroup) {defer wg.Done()topic := "my-topic"partition := 0// 连接至Kafka集群的Leader节点conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)if err != nil {log.Fatal("failed to dial leader:", err)}// 设置发送消息的超时时间conn.SetWriteDeadline(time.Now().Add(10 * time.Second))// 发送消息_, err = conn.WriteMessages(kafka.Message{Value: []byte("one!")},kafka.Message{Value: []byte("two!")},kafka.Message{Value: []byte("three!")},)if err != nil {log.Fatal("failed to write messages:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close writer:", err)}
}// readByConn 连接至kafka后接收消息
func readByConn(wg *sync.WaitGroup) {defer wg.Done()// 指定要连接的topic和partitiontopic := "my-topic"partition := 0// 连接至Kafka的leader节点conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)if err != nil {log.Fatal("failed to dial leader:", err)}// 设置读取超时时间conn.SetReadDeadline(time.Now().Add(10 * time.Second))// 读取一批消息,得到的batch是一系列消息的迭代器batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max// 遍历读取消息b := make([]byte, 10e3) // 10KB max per messagefor {n, err := batch.Read(b)if err != nil {break}fmt.Println(string(b[:n]))}// 关闭batchif err := batch.Close(); err != nil {log.Fatal("failed to close batch:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close connection:", err)}
}func main() {var wg sync.WaitGroupwg.Add(2)go writeByConn(&wg)go readByConn(&wg)wg.Wait()
}
运行结果
one!
two!
three!
one!
two!
three!
one!
two!
three!
one!
two!
three!
one!
two!
three!
2023/08/25 10:14:10 failed to close batch:[7] Request Timed Out: the request exceeded the user-specified time limit in the request
可以看到,利用两个goroutine,成功在一个文件里实现了生产者写入消息,消费者消费消息并打印出来
相关文章:
go语言kafka入门
消息队列:一种基于异步通信的解耦机制,用于在应用程序或系统组件之间传递消息和数据 消息队列相关概念: 生产者(Producer):生成并发送消息到消息队列中的应用程序或系统组件。 消费者(Consumer&…...
自定义拖拽功能,上下拖拽改变盒子高度
核心在于监听鼠标的move来改变div的高度,抽成了组件 <template><div ref"container" class"drag"><z-tooltip v-if"isShowIcon" effect"dark" content"格式化" placement"top-start"&…...
JavaScript Es6_4笔记
JavaScript 进阶 文章目录 JavaScript 进阶深浅拷贝浅拷贝深拷贝递归实现深拷贝js库lodash里面cloneDeep内部实现了深拷贝JSON序列化 异常处理throwtry ... catchdebugger 处理this普通函数箭头函数改变this指向callapplybind 防抖节流 深浅拷贝 浅拷贝 首先浅拷贝和深拷贝只…...
Python“牵手”易贝(Ebay)商品列表数据,关键词搜索ebayAPI接口数据,ebayAPI接口申请指南
Ebay平台API接口是为开发电商类应用程序而设计的一套完整的、跨浏览器、跨平台的接口规范, EbayAPI接口是指通过编程的方式,让开发者能够通过HTTP协议直接访问Ebay平台的数据,包括商品信息、店铺信息、物流信息等,从而实现Ebay平…...
C语言:选择+编程(每日一练Day8)
目录 选择题: 题一: 题二: 题三: 题四: 题五: 编程题: 题一:字符个数统计 思路一: 题二:多数元素 思路一: 本人实力有限可能对一些…...
使用 uniapp 适用于wx小程序 - 实现移动端头部的封装和调用
图例:红框区域,使其标题区与胶囊对齐 一、组件 navigation.vue <template><view class"nav_name"><view class"nav-title" :style"{color : props.color, padding-top : toprpx,background : props.bgColor,he…...
ARM Linux 系统稳定性分析入门及渐进 13 -- gdb 反汇编 disassemble 命令详细介绍及举例】
文章目录 1.1 gdb 调试回顾1.1.1 gdb list 命令介绍 1.2 反汇编命令 dis 介绍1.2.1 如何设置 gdb 汇编代码的格式 1.1 gdb 调试回顾 在GNU调试器(GDB)中,有许多命令可以帮助我们调试应用程序。 gdb: 这是一个强大的Unix下的程序调试工具。以…...
python连接Microsoft SQL Server 数据库
python代码 Author: tkhywang 2810248865qq.com Date: 2023-08-21 11:22:24 LastEditors: tkhywang 2810248865qq.com LastEditTime: 2023-08-21 11:29:30 FilePath: \PythonProject02\Microsoft SQL Server 数据库.py Description: 这是默认设置,请设置customMade, 打开koroFi…...
docker可视化工具
安装Portainer 官方安装说明:https://www.portainer.io/installation/ [rootubuntu1804 ~]#docker pull portainer/portainer[rootubuntu1804 ~]#docker volume create portainer_data portainer_data [rootubuntu1804 ~]#docker run -d -p 8000:8000 -p 9000:90…...
MySQL 用户管理操作
目录 一、用户管理概述 二、用户管理 1、创建用户 2、删除用户 三、账户密码管理 1、root用户修改自己的密码 2、ROOT用户修改其他普通用户密码 3、普通用户修改自己的密码 4、ROOT用户密码忘记解决办法 1)Linux系统 2)windows系统 四、用户权…...
【python办公自动化】PysimpleGUI中的popup弹窗中的按钮设置居中
PysimpleGUI中的popup弹窗中的按钮设置居中 背景问题解决背景 默认的popup弹窗中的OK按钮是在最下面偏左侧一些,有时需要将按钮放置居中 问题解决 首先找到pysimplegui源代码文件中popup的部分 然后定位到19388行,源文件内容如下 关于popup弹窗OK按钮的设置,将pad属性…...
postgresql 同步流复制两个相关参数synchronous_commit 和 synchronous_standby_names
一:synchronous_commit 1.synchronous_commit参数含义 这个参数用来设置事务提交返回客户端之前,一个事务是否需要等待 WAL 记录被写入磁盘。合法的值是{local,remote_write,remote_apply,on,off}。默认设置是on。 2.各可选值含义 2.1 local 当事务…...
运算放大器发展史
在内部集成了一个补偿电容 MPS公司OP07推出后,大受欢迎。各家厂商都推出了自己的 这4款都是可以替换的...
LVS+Keepalived 实验
Keepalived 是什么 Keepalived 是一个基于VRRP协议来实现的LVS服务高可用方案,可以解决静态路由出现的单点故障问题的一款检查工具 在一个LVS服务集群中通常有主服务器(MASTER)和备份服务器(BACKUP)两种角色的服务器…...
FreeSWITCH 1.10.10 简单图形化界面1 - docker/脚本/ISO镜像安装
FreeSWITCH 1.10.10 简单图形化界面1 - docker/脚本/ISO镜像安装 0. 界面预览1. Docker安装1.1 下载docker镜像1.2 启动docker镜像1.3 登录 2. 脚本安装2.1 下载2.2 安装2.3 登录2.4 卸载程序 3. 镜像安装3.1 下载镜像3.2 安装镜像3.3 登录 0. 界面预览 http://myfs.f3322.net…...
内网渗透神器CobaltStrike之权限提升(七)
Uac绕过 常见uac攻击模块 UAC-DLL UAC-DLL攻击模块允许攻击者从低权限的本地管理员账户获得更高的权限。这种攻击利用UAC的漏洞,将ArtifactKit生成的恶意DLL复制到需要特权的位置。 适用于Windows7和Windows8及更高版本的未修补版本 Uac-token-duplication 此攻…...
使用haproxy搭建web架构
haproxy HAProxy是一个免费的负载均衡软件,可以运行于大部分主流的Linux操作系统上。 HAProxy提供了可以在七层和四层两种负载均衡能力,它可以提供高可用性、负载均衡、及基于TCP和HTTP应用的代理。适用于负载大的Web站点,在运行在硬件上可…...
Java基础之IO流File类创建及删除
1.File类概述及构造方法 2.File类创建功能 文件创建成功! 如果文件不存在,就创建文件,并返回true 如果文件存在,就不创建文件,并返回false 如果文件夹不存在,就创建文件夹,并返回true 如果文件…...
高速道路监控:工业路由器助力高速监控远程管理与维护
工业路由器在物联网应用中扮演着重要的角色。物联网的发展使得大量设备和传感器能够互联互通,而工业路由器作为连接这些设备和网络的中间桥梁,承担着数据传输和安全管理的重要责任。 工业路由器能够为高速监控提供网络功能,实现户外无线网络部…...
【校招VIP】前端基础之post和get
考点介绍: get和post 是网络基础,也是每个前端同学绕不过去的小问题,但是在校招面试中很多同学在基础回答中不到位,或者倒在引申问题里,就丢分了。 『前端基础之post和get』相关题目及解析内容可点击文章末尾链接查看…...
eNSP-Cloud(实现本地电脑与eNSP内设备之间通信)
说明: 想象一下,你正在用eNSP搭建一个虚拟的网络世界,里面有虚拟的路由器、交换机、电脑(PC)等等。这些设备都在你的电脑里面“运行”,它们之间可以互相通信,就像一个封闭的小王国。 但是&#…...
React 第五十五节 Router 中 useAsyncError的使用详解
前言 useAsyncError 是 React Router v6.4 引入的一个钩子,用于处理异步操作(如数据加载)中的错误。下面我将详细解释其用途并提供代码示例。 一、useAsyncError 用途 处理异步错误:捕获在 loader 或 action 中发生的异步错误替…...
SkyWalking 10.2.0 SWCK 配置过程
SkyWalking 10.2.0 & SWCK 配置过程 skywalking oap-server & ui 使用Docker安装在K8S集群以外,K8S集群中的微服务使用initContainer按命名空间将skywalking-java-agent注入到业务容器中。 SWCK有整套的解决方案,全安装在K8S群集中。 具体可参…...
Xshell远程连接Kali(默认 | 私钥)Note版
前言:xshell远程连接,私钥连接和常规默认连接 任务一 开启ssh服务 service ssh status //查看ssh服务状态 service ssh start //开启ssh服务 update-rc.d ssh enable //开启自启动ssh服务 任务二 修改配置文件 vi /etc/ssh/ssh_config //第一…...
Auto-Coder使用GPT-4o完成:在用TabPFN这个模型构建一个预测未来3天涨跌的分类任务
通过akshare库,获取股票数据,并生成TabPFN这个模型 可以识别、处理的格式,写一个完整的预处理示例,并构建一个预测未来 3 天股价涨跌的分类任务 用TabPFN这个模型构建一个预测未来 3 天股价涨跌的分类任务,进行预测并输…...
苍穹外卖--缓存菜品
1.问题说明 用户端小程序展示的菜品数据都是通过查询数据库获得,如果用户端访问量比较大,数据库访问压力随之增大 2.实现思路 通过Redis来缓存菜品数据,减少数据库查询操作。 缓存逻辑分析: ①每个分类下的菜品保持一份缓存数据…...
Spring Boot+Neo4j知识图谱实战:3步搭建智能关系网络!
一、引言 在数据驱动的背景下,知识图谱凭借其高效的信息组织能力,正逐步成为各行业应用的关键技术。本文聚焦 Spring Boot与Neo4j图数据库的技术结合,探讨知识图谱开发的实现细节,帮助读者掌握该技术栈在实际项目中的落地方法。 …...
涂鸦T5AI手搓语音、emoji、otto机器人从入门到实战
“🤖手搓TuyaAI语音指令 😍秒变表情包大师,让萌系Otto机器人🔥玩出智能新花样!开整!” 🤖 Otto机器人 → 直接点明主体 手搓TuyaAI语音 → 强调 自主编程/自定义 语音控制(TuyaAI…...
【JavaWeb】Docker项目部署
引言 之前学习了Linux操作系统的常见命令,在Linux上安装软件,以及如何在Linux上部署一个单体项目,大多数同学都会有相同的感受,那就是麻烦。 核心体现在三点: 命令太多了,记不住 软件安装包名字复杂&…...
【开发技术】.Net使用FFmpeg视频特定帧上绘制内容
目录 一、目的 二、解决方案 2.1 什么是FFmpeg 2.2 FFmpeg主要功能 2.3 使用Xabe.FFmpeg调用FFmpeg功能 2.4 使用 FFmpeg 的 drawbox 滤镜来绘制 ROI 三、总结 一、目的 当前市场上有很多目标检测智能识别的相关算法,当前调用一个医疗行业的AI识别算法后返回…...
