当前位置: 首页 > news >正文

go语言kafka入门

消息队列:一种基于异步通信的解耦机制,用于在应用程序或系统组件之间传递消息和数据

消息队列相关概念:

生产者(Producer):生成并发送消息到消息队列中的应用程序或系统组件。

消费者(Consumer):从消息队列中接收和处理消息的应用程序或系统组件。

主题(Topic):消息队列中用于分类和分组消息的逻辑概念,生产者将消息发送到指定的主题,而消费者可以订阅特定的主题以接收相应的消息。

队列(Queue):消息队列中存储消息的容器,遵循先进先出(FIFO)的原则。

发布-订阅模式(Publish-Subscribe Pattern):一种消息分发模式,生产者将消息发送到一个或多个主题,而消费者通过订阅感兴趣的主题来接收相应的消息。

点对点模式(Point-to-Point Pattern):一种消息传递模式,生产者将消息发送到特定的队列中,而消费者从队列中接收并处理消息。

消息序列化(Message Serialization):将消息从应用程序的数据结构转换为可以在消息队列中传输和存储的格式,通常使用如JSON、XML 或二进制等格式。

消息持久化(Message Persistence):将消息保存到持久化存储中,以确保即使在消息队列或应用程序重启之后也不会丢失

在这里插入图片描述

安装kafka-go

go get github.com/segmentio/kafka-go

简单示例

package mainimport ("context""fmt""log""sync""time""github.com/segmentio/kafka-go"
)// writeByConn 基于Conn发送消息
func writeByConn(wg *sync.WaitGroup) {defer wg.Done()topic := "my-topic"partition := 0// 连接至Kafka集群的Leader节点conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)if err != nil {log.Fatal("failed to dial leader:", err)}// 设置发送消息的超时时间conn.SetWriteDeadline(time.Now().Add(10 * time.Second))// 发送消息_, err = conn.WriteMessages(kafka.Message{Value: []byte("one!")},kafka.Message{Value: []byte("two!")},kafka.Message{Value: []byte("three!")},)if err != nil {log.Fatal("failed to write messages:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close writer:", err)}
}// readByConn 连接至kafka后接收消息
func readByConn(wg *sync.WaitGroup) {defer wg.Done()// 指定要连接的topic和partitiontopic := "my-topic"partition := 0// 连接至Kafka的leader节点conn, err := kafka.DialLeader(context.Background(), "tcp", "localhost:9092", topic, partition)if err != nil {log.Fatal("failed to dial leader:", err)}// 设置读取超时时间conn.SetReadDeadline(time.Now().Add(10 * time.Second))// 读取一批消息,得到的batch是一系列消息的迭代器batch := conn.ReadBatch(10e3, 1e6) // fetch 10KB min, 1MB max// 遍历读取消息b := make([]byte, 10e3) // 10KB max per messagefor {n, err := batch.Read(b)if err != nil {break}fmt.Println(string(b[:n]))}// 关闭batchif err := batch.Close(); err != nil {log.Fatal("failed to close batch:", err)}// 关闭连接if err := conn.Close(); err != nil {log.Fatal("failed to close connection:", err)}
}func main() {var wg sync.WaitGroupwg.Add(2)go writeByConn(&wg)go readByConn(&wg)wg.Wait()
}

运行结果

one!
two!
three!
one!
two!
three!
one!
two!
three!
one!
two!
three!
one!
two!
three!
2023/08/25 10:14:10 failed to close batch:[7] Request Timed Out: the request exceeded the user-specified time limit in the request

可以看到,利用两个goroutine,成功在一个文件里实现了生产者写入消息,消费者消费消息并打印出来

相关文章:

go语言kafka入门

消息队列:一种基于异步通信的解耦机制,用于在应用程序或系统组件之间传递消息和数据 消息队列相关概念: 生产者(Producer):生成并发送消息到消息队列中的应用程序或系统组件。 消费者(Consumer&…...

自定义拖拽功能,上下拖拽改变盒子高度

核心在于监听鼠标的move来改变div的高度&#xff0c;抽成了组件 <template><div ref"container" class"drag"><z-tooltip v-if"isShowIcon" effect"dark" content"格式化" placement"top-start"&…...

JavaScript Es6_4笔记

JavaScript 进阶 文章目录 JavaScript 进阶深浅拷贝浅拷贝深拷贝递归实现深拷贝js库lodash里面cloneDeep内部实现了深拷贝JSON序列化 异常处理throwtry ... catchdebugger 处理this普通函数箭头函数改变this指向callapplybind 防抖节流 深浅拷贝 浅拷贝 首先浅拷贝和深拷贝只…...

Python“牵手”易贝(Ebay)商品列表数据,关键词搜索ebayAPI接口数据,ebayAPI接口申请指南

Ebay平台API接口是为开发电商类应用程序而设计的一套完整的、跨浏览器、跨平台的接口规范&#xff0c; EbayAPI接口是指通过编程的方式&#xff0c;让开发者能够通过HTTP协议直接访问Ebay平台的数据&#xff0c;包括商品信息、店铺信息、物流信息等&#xff0c;从而实现Ebay平…...

C语言:选择+编程(每日一练Day8)

目录 选择题&#xff1a; 题一&#xff1a; 题二&#xff1a; 题三&#xff1a; 题四&#xff1a; 题五&#xff1a; 编程题&#xff1a; 题一&#xff1a;字符个数统计 思路一&#xff1a; 题二&#xff1a;多数元素 思路一&#xff1a; 本人实力有限可能对一些…...

使用 uniapp 适用于wx小程序 - 实现移动端头部的封装和调用

图例&#xff1a;红框区域&#xff0c;使其标题区与胶囊对齐 一、组件 navigation.vue <template><view class"nav_name"><view class"nav-title" :style"{color : props.color, padding-top : toprpx,background : props.bgColor,he…...

ARM Linux 系统稳定性分析入门及渐进 13 -- gdb 反汇编 disassemble 命令详细介绍及举例】

文章目录 1.1 gdb 调试回顾1.1.1 gdb list 命令介绍 1.2 反汇编命令 dis 介绍1.2.1 如何设置 gdb 汇编代码的格式 1.1 gdb 调试回顾 在GNU调试器&#xff08;GDB&#xff09;中&#xff0c;有许多命令可以帮助我们调试应用程序。 gdb: 这是一个强大的Unix下的程序调试工具。以…...

python连接Microsoft SQL Server 数据库

python代码 Author: tkhywang 2810248865qq.com Date: 2023-08-21 11:22:24 LastEditors: tkhywang 2810248865qq.com LastEditTime: 2023-08-21 11:29:30 FilePath: \PythonProject02\Microsoft SQL Server 数据库.py Description: 这是默认设置,请设置customMade, 打开koroFi…...

docker可视化工具

安装Portainer 官方安装说明&#xff1a;https://www.portainer.io/installation/ [rootubuntu1804 ~]#docker pull portainer/portainer[rootubuntu1804 ~]#docker volume create portainer_data portainer_data [rootubuntu1804 ~]#docker run -d -p 8000:8000 -p 9000:90…...

MySQL 用户管理操作

目录 一、用户管理概述 二、用户管理 1、创建用户 2、删除用户 三、账户密码管理 1、root用户修改自己的密码 2、ROOT用户修改其他普通用户密码 3、普通用户修改自己的密码 4、ROOT用户密码忘记解决办法 1&#xff09;Linux系统 2&#xff09;windows系统 四、用户权…...

【python办公自动化】PysimpleGUI中的popup弹窗中的按钮设置居中

PysimpleGUI中的popup弹窗中的按钮设置居中 背景问题解决背景 默认的popup弹窗中的OK按钮是在最下面偏左侧一些,有时需要将按钮放置居中 问题解决 首先找到pysimplegui源代码文件中popup的部分 然后定位到19388行,源文件内容如下 关于popup弹窗OK按钮的设置,将pad属性…...

postgresql 同步流复制两个相关参数synchronous_commit 和 synchronous_standby_names

一&#xff1a;synchronous_commit 1.synchronous_commit参数含义 这个参数用来设置事务提交返回客户端之前&#xff0c;一个事务是否需要等待 WAL 记录被写入磁盘。合法的值是{local,remote_write,remote_apply,on,off}。默认设置是on。 2.各可选值含义 2.1 local 当事务…...

运算放大器发展史

在内部集成了一个补偿电容 MPS公司OP07推出后&#xff0c;大受欢迎。各家厂商都推出了自己的 这4款都是可以替换的...

LVS+Keepalived 实验

Keepalived 是什么 Keepalived 是一个基于VRRP协议来实现的LVS服务高可用方案&#xff0c;可以解决静态路由出现的单点故障问题的一款检查工具 在一个LVS服务集群中通常有主服务器&#xff08;MASTER&#xff09;和备份服务器&#xff08;BACKUP&#xff09;两种角色的服务器…...

FreeSWITCH 1.10.10 简单图形化界面1 - docker/脚本/ISO镜像安装

FreeSWITCH 1.10.10 简单图形化界面1 - docker/脚本/ISO镜像安装 0. 界面预览1. Docker安装1.1 下载docker镜像1.2 启动docker镜像1.3 登录 2. 脚本安装2.1 下载2.2 安装2.3 登录2.4 卸载程序 3. 镜像安装3.1 下载镜像3.2 安装镜像3.3 登录 0. 界面预览 http://myfs.f3322.net…...

内网渗透神器CobaltStrike之权限提升(七)

Uac绕过 常见uac攻击模块 UAC-DLL UAC-DLL攻击模块允许攻击者从低权限的本地管理员账户获得更高的权限。这种攻击利用UAC的漏洞&#xff0c;将ArtifactKit生成的恶意DLL复制到需要特权的位置。 适用于Windows7和Windows8及更高版本的未修补版本 Uac-token-duplication 此攻…...

使用haproxy搭建web架构

haproxy HAProxy是一个免费的负载均衡软件&#xff0c;可以运行于大部分主流的Linux操作系统上。 HAProxy提供了可以在七层和四层两种负载均衡能力&#xff0c;它可以提供高可用性、负载均衡、及基于TCP和HTTP应用的代理。适用于负载大的Web站点&#xff0c;在运行在硬件上可…...

Java基础之IO流File类创建及删除

1.File类概述及构造方法 2.File类创建功能 文件创建成功&#xff01; 如果文件不存在&#xff0c;就创建文件&#xff0c;并返回true 如果文件存在&#xff0c;就不创建文件&#xff0c;并返回false 如果文件夹不存在&#xff0c;就创建文件夹&#xff0c;并返回true 如果文件…...

高速道路监控:工业路由器助力高速监控远程管理与维护

工业路由器在物联网应用中扮演着重要的角色。物联网的发展使得大量设备和传感器能够互联互通&#xff0c;而工业路由器作为连接这些设备和网络的中间桥梁&#xff0c;承担着数据传输和安全管理的重要责任。 工业路由器能够为高速监控提供网络功能&#xff0c;实现户外无线网络部…...

【校招VIP】前端基础之post和get

考点介绍&#xff1a; get和post 是网络基础&#xff0c;也是每个前端同学绕不过去的小问题&#xff0c;但是在校招面试中很多同学在基础回答中不到位&#xff0c;或者倒在引申问题里&#xff0c;就丢分了。 『前端基础之post和get』相关题目及解析内容可点击文章末尾链接查看…...

KubeSphere 容器平台高可用:环境搭建与可视化操作指南

Linux_k8s篇 欢迎来到Linux的世界&#xff0c;看笔记好好学多敲多打&#xff0c;每个人都是大神&#xff01; 题目&#xff1a;KubeSphere 容器平台高可用&#xff1a;环境搭建与可视化操作指南 版本号: 1.0,0 作者: 老王要学习 日期: 2025.06.05 适用环境: Ubuntu22 文档说…...

内存分配函数malloc kmalloc vmalloc

内存分配函数malloc kmalloc vmalloc malloc实现步骤: 1)请求大小调整:首先,malloc 需要调整用户请求的大小,以适应内部数据结构(例如,可能需要存储额外的元数据)。通常,这包括对齐调整,确保分配的内存地址满足特定硬件要求(如对齐到8字节或16字节边界)。 2)空闲…...

调用支付宝接口响应40004 SYSTEM_ERROR问题排查

在对接支付宝API的时候&#xff0c;遇到了一些问题&#xff0c;记录一下排查过程。 Body:{"datadigital_fincloud_generalsaas_face_certify_initialize_response":{"msg":"Business Failed","code":"40004","sub_msg…...

RocketMQ延迟消息机制

两种延迟消息 RocketMQ中提供了两种延迟消息机制 指定固定的延迟级别 通过在Message中设定一个MessageDelayLevel参数&#xff0c;对应18个预设的延迟级别指定时间点的延迟级别 通过在Message中设定一个DeliverTimeMS指定一个Long类型表示的具体时间点。到了时间点后&#xf…...

python打卡day49

知识点回顾&#xff1a; 通道注意力模块复习空间注意力模块CBAM的定义 作业&#xff1a;尝试对今天的模型检查参数数目&#xff0c;并用tensorboard查看训练过程 import torch import torch.nn as nn# 定义通道注意力 class ChannelAttention(nn.Module):def __init__(self,…...

React Native在HarmonyOS 5.0阅读类应用开发中的实践

一、技术选型背景 随着HarmonyOS 5.0对Web兼容层的增强&#xff0c;React Native作为跨平台框架可通过重新编译ArkTS组件实现85%以上的代码复用率。阅读类应用具有UI复杂度低、数据流清晰的特点。 二、核心实现方案 1. 环境配置 &#xff08;1&#xff09;使用React Native…...

P3 QT项目----记事本(3.8)

3.8 记事本项目总结 项目源码 1.main.cpp #include "widget.h" #include <QApplication> int main(int argc, char *argv[]) {QApplication a(argc, argv);Widget w;w.show();return a.exec(); } 2.widget.cpp #include "widget.h" #include &q…...

在鸿蒙HarmonyOS 5中使用DevEco Studio实现录音机应用

1. 项目配置与权限设置 1.1 配置module.json5 {"module": {"requestPermissions": [{"name": "ohos.permission.MICROPHONE","reason": "录音需要麦克风权限"},{"name": "ohos.permission.WRITE…...

Rapidio门铃消息FIFO溢出机制

关于RapidIO门铃消息FIFO的溢出机制及其与中断抖动的关系&#xff0c;以下是深入解析&#xff1a; 门铃FIFO溢出的本质 在RapidIO系统中&#xff0c;门铃消息FIFO是硬件控制器内部的缓冲区&#xff0c;用于临时存储接收到的门铃消息&#xff08;Doorbell Message&#xff09;。…...

sipsak:SIP瑞士军刀!全参数详细教程!Kali Linux教程!

简介 sipsak 是一个面向会话初始协议 (SIP) 应用程序开发人员和管理员的小型命令行工具。它可以用于对 SIP 应用程序和设备进行一些简单的测试。 sipsak 是一款 SIP 压力和诊断实用程序。它通过 sip-uri 向服务器发送 SIP 请求&#xff0c;并检查收到的响应。它以以下模式之一…...