当前位置: 首页 > news >正文

go语言kafka入门

消息队列:一种基于异步通信的解耦机制,用于在应用程序或系统组件之间传递消息和数据

消息队列相关概念:

生产者(Producer):生成并发送消息到消息队列中的应用程序或系统组件。

消费者(Consumer):从消息队列中接收和处理消息的应用程序或系统组件。

主题(Topic):消息队列中用于分类和分组消息的逻辑概念,生产者将消息发送到指定的主题,而消费者可以订阅特定的主题以接收相应的消息。

队列(Queue):消息队列中存储消息的容器,遵循先进先出(FIFO)的原则。

发布-订阅模式(Publish-Subscribe Pattern):一种消息分发模式,生产者将消息发送到一个或多个主题,而消费者通过订阅感兴趣的主题来接收相应的消息。

点对点模式(Point-to-Point Pattern):一种消息传递模式,生产者将消息发送到特定的队列中,而消费者从队列中接收并处理消息。

消息序列化(Message Serialization):将消息从应用程序的数据结构转换为可以在消息队列中传输和存储的格式,通常使用如JSON、XML 或二进制等格式。

消息持久化(Message Persistence):将消息保存到持久化存储中,以确保即使在消息队列或应用程序重启之后也不会丢失

在这里插入图片描述

安装kafka-go

go get github.com/segmentio/kafka-go

简单示例

package mainimport ("context""fmt""log""sync""time""github.com/segmentio/kafka-go"
)// writeByConn 基于Conn发送消息
func writeByConn(wg *sync.WaitGroup) {defer wg.Done()topic := "my-topic"partition := 0// 连接至Kafka集群的Leader节点conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)if err != nil {log.Fatal("failed to dial leader:", err)}// 设置发送消息的超时时间conn.SetWriteDeadline(time.Now().Add(10 * time.Second))// 发送消息_, err = conn.WriteMessages(kafka.Message{Value: []byte("one!")},kafka.Message{Value: []byte("two!")},kafka.Message{Value: []byte("three!")},)if err != nil {log.Fatal("failed to write messages:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close writer:", err)}
}// readByConn 连接至kafka后接收消息
func readByConn(wg *sync.WaitGroup) {defer wg.Done()// 指定要连接的topic和partitiontopic := "my-topic"partition := 0// 连接至Kafka的leader节点conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)if err != nil {log.Fatal("failed to dial leader:", err)}// 设置读取超时时间conn.SetReadDeadline(time.Now().Add(10 * time.Second))// 读取一批消息,得到的batch是一系列消息的迭代器batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max// 遍历读取消息b := make([]byte, 10e3) // 10KB max per messagefor {n, err := batch.Read(b)if err != nil {break}fmt.Println(string(b[:n]))}// 关闭batchif err := batch.Close(); err != nil {log.Fatal("failed to close batch:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close connection:", err)}
}func main() {var wg sync.WaitGroupwg.Add(2)go writeByConn(&wg)go readByConn(&wg)wg.Wait()
}

运行结果

one!
two!
three!
one!
two!
three!
one!
two!
three!
one!
two!
three!
one!
two!
three!
2023/08/25 10:14:10 failed to close batch:[7] Request Timed Out: the request exceeded the user-specified time limit in the request

可以看到,利用两个goroutine,成功在一个文件里实现了生产者写入消息,消费者消费消息并打印出来

相关文章:

go语言kafka入门

消息队列:一种基于异步通信的解耦机制,用于在应用程序或系统组件之间传递消息和数据 消息队列相关概念: 生产者(Producer):生成并发送消息到消息队列中的应用程序或系统组件。 消费者(Consumer&…...

自定义拖拽功能,上下拖拽改变盒子高度

核心在于监听鼠标的move来改变div的高度&#xff0c;抽成了组件 <template><div ref"container" class"drag"><z-tooltip v-if"isShowIcon" effect"dark" content"格式化" placement"top-start"&…...

JavaScript Es6_4笔记

JavaScript 进阶 文章目录 JavaScript 进阶深浅拷贝浅拷贝深拷贝递归实现深拷贝js库lodash里面cloneDeep内部实现了深拷贝JSON序列化 异常处理throwtry ... catchdebugger 处理this普通函数箭头函数改变this指向callapplybind 防抖节流 深浅拷贝 浅拷贝 首先浅拷贝和深拷贝只…...

Python“牵手”易贝(Ebay)商品列表数据,关键词搜索ebayAPI接口数据,ebayAPI接口申请指南

Ebay平台API接口是为开发电商类应用程序而设计的一套完整的、跨浏览器、跨平台的接口规范&#xff0c; EbayAPI接口是指通过编程的方式&#xff0c;让开发者能够通过HTTP协议直接访问Ebay平台的数据&#xff0c;包括商品信息、店铺信息、物流信息等&#xff0c;从而实现Ebay平…...

C语言:选择+编程(每日一练Day8)

目录 选择题&#xff1a; 题一&#xff1a; 题二&#xff1a; 题三&#xff1a; 题四&#xff1a; 题五&#xff1a; 编程题&#xff1a; 题一&#xff1a;字符个数统计 思路一&#xff1a; 题二&#xff1a;多数元素 思路一&#xff1a; 本人实力有限可能对一些…...

使用 uniapp 适用于wx小程序 - 实现移动端头部的封装和调用

图例&#xff1a;红框区域&#xff0c;使其标题区与胶囊对齐 一、组件 navigation.vue <template><view class"nav_name"><view class"nav-title" :style"{color : props.color, padding-top : toprpx,background : props.bgColor,he…...

ARM Linux 系统稳定性分析入门及渐进 13 -- gdb 反汇编 disassemble 命令详细介绍及举例】

文章目录 1.1 gdb 调试回顾1.1.1 gdb list 命令介绍 1.2 反汇编命令 dis 介绍1.2.1 如何设置 gdb 汇编代码的格式 1.1 gdb 调试回顾 在GNU调试器&#xff08;GDB&#xff09;中&#xff0c;有许多命令可以帮助我们调试应用程序。 gdb: 这是一个强大的Unix下的程序调试工具。以…...

python连接Microsoft SQL Server 数据库

python代码 Author: tkhywang 2810248865qq.com Date: 2023-08-21 11:22:24 LastEditors: tkhywang 2810248865qq.com LastEditTime: 2023-08-21 11:29:30 FilePath: \PythonProject02\Microsoft SQL Server 数据库.py Description: 这是默认设置,请设置customMade, 打开koroFi…...

docker可视化工具

安装Portainer 官方安装说明&#xff1a;https://www.portainer.io/installation/ [rootubuntu1804 ~]#docker pull portainer/portainer[rootubuntu1804 ~]#docker volume create portainer_data portainer_data [rootubuntu1804 ~]#docker run -d -p 8000:8000 -p 9000:90…...

MySQL 用户管理操作

目录 一、用户管理概述 二、用户管理 1、创建用户 2、删除用户 三、账户密码管理 1、root用户修改自己的密码 2、ROOT用户修改其他普通用户密码 3、普通用户修改自己的密码 4、ROOT用户密码忘记解决办法 1&#xff09;Linux系统 2&#xff09;windows系统 四、用户权…...

【python办公自动化】PysimpleGUI中的popup弹窗中的按钮设置居中

PysimpleGUI中的popup弹窗中的按钮设置居中 背景问题解决背景 默认的popup弹窗中的OK按钮是在最下面偏左侧一些,有时需要将按钮放置居中 问题解决 首先找到pysimplegui源代码文件中popup的部分 然后定位到19388行,源文件内容如下 关于popup弹窗OK按钮的设置,将pad属性…...

postgresql 同步流复制两个相关参数synchronous_commit 和 synchronous_standby_names

一&#xff1a;synchronous_commit 1.synchronous_commit参数含义 这个参数用来设置事务提交返回客户端之前&#xff0c;一个事务是否需要等待 WAL 记录被写入磁盘。合法的值是{local,remote_write,remote_apply,on,off}。默认设置是on。 2.各可选值含义 2.1 local 当事务…...

运算放大器发展史

在内部集成了一个补偿电容 MPS公司OP07推出后&#xff0c;大受欢迎。各家厂商都推出了自己的 这4款都是可以替换的...

LVS+Keepalived 实验

Keepalived 是什么 Keepalived 是一个基于VRRP协议来实现的LVS服务高可用方案&#xff0c;可以解决静态路由出现的单点故障问题的一款检查工具 在一个LVS服务集群中通常有主服务器&#xff08;MASTER&#xff09;和备份服务器&#xff08;BACKUP&#xff09;两种角色的服务器…...

FreeSWITCH 1.10.10 简单图形化界面1 - docker/脚本/ISO镜像安装

FreeSWITCH 1.10.10 简单图形化界面1 - docker/脚本/ISO镜像安装 0. 界面预览1. Docker安装1.1 下载docker镜像1.2 启动docker镜像1.3 登录 2. 脚本安装2.1 下载2.2 安装2.3 登录2.4 卸载程序 3. 镜像安装3.1 下载镜像3.2 安装镜像3.3 登录 0. 界面预览 http://myfs.f3322.net…...

内网渗透神器CobaltStrike之权限提升(七)

Uac绕过 常见uac攻击模块 UAC-DLL UAC-DLL攻击模块允许攻击者从低权限的本地管理员账户获得更高的权限。这种攻击利用UAC的漏洞&#xff0c;将ArtifactKit生成的恶意DLL复制到需要特权的位置。 适用于Windows7和Windows8及更高版本的未修补版本 Uac-token-duplication 此攻…...

使用haproxy搭建web架构

haproxy HAProxy是一个免费的负载均衡软件&#xff0c;可以运行于大部分主流的Linux操作系统上。 HAProxy提供了可以在七层和四层两种负载均衡能力&#xff0c;它可以提供高可用性、负载均衡、及基于TCP和HTTP应用的代理。适用于负载大的Web站点&#xff0c;在运行在硬件上可…...

Java基础之IO流File类创建及删除

1.File类概述及构造方法 2.File类创建功能 文件创建成功&#xff01; 如果文件不存在&#xff0c;就创建文件&#xff0c;并返回true 如果文件存在&#xff0c;就不创建文件&#xff0c;并返回false 如果文件夹不存在&#xff0c;就创建文件夹&#xff0c;并返回true 如果文件…...

高速道路监控:工业路由器助力高速监控远程管理与维护

工业路由器在物联网应用中扮演着重要的角色。物联网的发展使得大量设备和传感器能够互联互通&#xff0c;而工业路由器作为连接这些设备和网络的中间桥梁&#xff0c;承担着数据传输和安全管理的重要责任。 工业路由器能够为高速监控提供网络功能&#xff0c;实现户外无线网络部…...

【校招VIP】前端基础之post和get

考点介绍&#xff1a; get和post 是网络基础&#xff0c;也是每个前端同学绕不过去的小问题&#xff0c;但是在校招面试中很多同学在基础回答中不到位&#xff0c;或者倒在引申问题里&#xff0c;就丢分了。 『前端基础之post和get』相关题目及解析内容可点击文章末尾链接查看…...

VB.net复制Ntag213卡写入UID

本示例使用的发卡器&#xff1a;https://item.taobao.com/item.htm?ftt&id615391857885 一、读取旧Ntag卡的UID和数据 Private Sub Button15_Click(sender As Object, e As EventArgs) Handles Button15.Click轻松读卡技术支持:网站:Dim i, j As IntegerDim cardidhex, …...

UDP(Echoserver)

网络命令 Ping 命令 检测网络是否连通 使用方法: ping -c 次数 网址ping -c 3 www.baidu.comnetstat 命令 netstat 是一个用来查看网络状态的重要工具. 语法&#xff1a;netstat [选项] 功能&#xff1a;查看网络状态 常用选项&#xff1a; n 拒绝显示别名&#…...

Springcloud:Eureka 高可用集群搭建实战(服务注册与发现的底层原理与避坑指南)

引言&#xff1a;为什么 Eureka 依然是存量系统的核心&#xff1f; 尽管 Nacos 等新注册中心崛起&#xff0c;但金融、电力等保守行业仍有大量系统运行在 Eureka 上。理解其高可用设计与自我保护机制&#xff0c;是保障分布式系统稳定的必修课。本文将手把手带你搭建生产级 Eur…...

【python异步多线程】异步多线程爬虫代码示例

claude生成的python多线程、异步代码示例&#xff0c;模拟20个网页的爬取&#xff0c;每个网页假设要0.5-2秒完成。 代码 Python多线程爬虫教程 核心概念 多线程&#xff1a;允许程序同时执行多个任务&#xff0c;提高IO密集型任务&#xff08;如网络请求&#xff09;的效率…...

RabbitMQ入门4.1.0版本(基于java、SpringBoot操作)

RabbitMQ 一、RabbitMQ概述 RabbitMQ RabbitMQ最初由LShift和CohesiveFT于2007年开发&#xff0c;后来由Pivotal Software Inc.&#xff08;现为VMware子公司&#xff09;接管。RabbitMQ 是一个开源的消息代理和队列服务器&#xff0c;用 Erlang 语言编写。广泛应用于各种分布…...

C# winform教程(二)----checkbox

一、作用 提供一个用户选择或者不选的状态&#xff0c;这是一个可以多选的控件。 二、属性 其实功能大差不差&#xff0c;除了特殊的几个外&#xff0c;与button基本相同&#xff0c;所有说几个独有的 checkbox属性 名称内容含义appearance控件外观可以变成按钮形状checkali…...

P10909 [蓝桥杯 2024 国 B] 立定跳远

# P10909 [蓝桥杯 2024 国 B] 立定跳远 ## 题目描述 在运动会上&#xff0c;小明从数轴的原点开始向正方向立定跳远。项目设置了 $n$ 个检查点 $a_1, a_2, \cdots , a_n$ 且 $a_i \ge a_{i−1} > 0$。小明必须先后跳跃到每个检查点上且只能跳跃到检查点上。同时&#xff0…...

VSCode 没有添加Windows右键菜单

关键字&#xff1a;VSCode&#xff1b;Windows右键菜单&#xff1b;注册表。 文章目录 前言一、工程环境二、配置流程1.右键文件打开2.右键文件夹打开3.右键空白处打开文件夹 三、测试总结 前言 安装 VSCode 时没有注意&#xff0c;实际使用的时候发现 VSCode 在 Windows 菜单栏…...

第14节 Node.js 全局对象

JavaScript 中有一个特殊的对象&#xff0c;称为全局对象&#xff08;Global Object&#xff09;&#xff0c;它及其所有属性都可以在程序的任何地方访问&#xff0c;即全局变量。 在浏览器 JavaScript 中&#xff0c;通常 window 是全局对象&#xff0c; 而 Node.js 中的全局…...

Spring是如何实现无代理对象的循环依赖

无代理对象的循环依赖 什么是循环依赖解决方案实现方式测试验证 引入代理对象的影响创建代理对象问题分析 源码见&#xff1a;mini-spring 什么是循环依赖 循环依赖是指在对象创建过程中&#xff0c;两个或多个对象相互依赖&#xff0c;导致创建过程陷入死循环。以下通过一个简…...