go-zero(十二)消息队列
go zero 消息队列
在微服务架构中,消息队列主要通过异步通信实现服务间的解耦,使得各个服务可以独立发展和扩展。
go-zero中使用的队列组件go-queue,是gozero官方实现的基于Kafka和Beanstalkd 的消息队列框架,我们使用kafka作为演示。
一、kafka简单介绍
Kafka 是一个开源的分布式流处理平台,主要用于构建实时数据管道和流应用。
1.Kafka 的架构
Kafka 的架构通常由以下几个部分构成:
- Broker(节点):Kafka 集群由多个 broker 实例组成,负责管理消息的存储和处理。
- Topic(主题):消息以主题的形式组织,每个主题可以有多个分区(partition),以支持高并发和扩展性。
- Produce(生产者):消息生产者将数据发送到特定的topic中。
- Consumer(消费者):消费者从topic中读取数据,可以将多个消费者分组以进行负载均衡。
2.Kafka 的关键特性
-
高吞吐量:
Kafka 设计上能够处理大量的实时数据流,具备非常高的吞吐量。这使得它能够轻松应对大规模数据流量,适合做日志聚合、监控数据处理等。 -
持久性:
Kafka 将消息持久化到磁盘,并提供复制功能,以确保数据的安全性和可靠性。即使在节点出现故障的情况下,也能保证数据不会丢失。 -
可扩展性:
Kafka 能够水平扩展,通过增加更多的节点来处理更多的消费者和生产者,这使得它能够应对越来越多的业务需求。 -
实时处理:
Kafka 提供低延迟的数据传输,这使得实时处理和分析成为可能。您可以瞬时处理到来的数据流。 -
支持多种消息传递模式:
Kafka 支持发布-订阅和点对点的消息传递模式,能够灵活适应不同场景下的需求。 -
强大的生态系统:
Kafka 拥有丰富的生态系统,包括 Kafka Streams 和 Kafka Connect,这些工具可以帮助开发者更方便地进行流处理和数据集成。
3.常见应用场景
-
日志聚合:
Kafka 可以作为一个集中式的日志聚合器,将分布在不同服务的日志集中到一个地方,方便后续分析和监控。 -
实时数据流处理:
使用 Kafka,用户可以实时处理和分析流数据,例如检测异常、生成实时报告等。 -
系统监控和事件追踪:
Kafka 经常用于收集和跟踪系统事件(如用户行为、系统状态等),并通过流式处理进行实时监控。 -
数据集成:
Kafka 可以作为数据的桥梁,连接不同的数据源和目标系统,方便实现数据的流转和转换。 -
消息队列:
Kafka 可用作高效的消息队列,实现服务间的异步通信。例如,在微服务架构中,服务 A 可以将消息发送到 Kafka,而服务 B 可以异步地从 Kafka 中读取处理这些消息。
二、环境部署
1.Docker安装Kafka
配置文件:
version: '3'######## 项目依赖的环境,启动项目之前要先启动此环境 #######services:#zookeeper是kafka的依赖 - Zookeeper is the dependencies of Kafkazookeeper:image: wurstmeister/zookeepercontainer_name: zookeeperenvironment:# 时区上海 - Time zone Shanghai (Change if needed)TZ: Asia/Shanghairestart: alwaysports:- 2181:2181networks:- gozero_net#消息队列 - Message queuekafka:image: 'bitnami/kafka:3.6.2'container_name: kafkarestart: alwaysulimits:nofile:soft: 65536hard: 65536environment:- TZ=Asia/Shanghai- KAFKA_CFG_NODE_ID=0- KAFKA_CFG_PROCESS_ROLES=controller,broker- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,EXTERNAL:PLAINTEXT- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLERports:- '9092:9092'- '9094:9094'volumes:- ./volumes/kafka:/bitnami/kafkanetworks:gozero_net:driver: bridgeipam:config:- subnet: 172.16.0.0/16
这里的kafka 对外暴露的端口为9094
使用命令执行文件
docker compose up
2.创建topic
进入kafka容器
docker exec -it {{容器ID}} /bin/bash
#或者直接使用容器名
docker exec -it kafka /bin/bash
进入kafka执行命令目录
进入kafka执行命令目录
cd /opt/bitnami/kafka/bin
创建topic
创建名为topic-test的topic
./kafka-topics.sh --create --topic topic-test --bootstrap-server localhost:9092
查看topic信息
./kafka-topics.sh --describe --topic topic-test --bootstrap-server localhost:9092
3.测试topic
使用两个终端,进入kafka执行命令目录,执行下面两个命令:
生产消息
./kafka-console-producer.sh --topic topic-test --bootstrap-server localhost:9092
消费消息
./kafka-console-consumer.sh --topic topic-test --bootstrap-server localhost:9092
测试
在生产者输入消息,会自动同步到消费者

4. 拉取依赖
项目中首先要拉取 go-queue 的依赖
go get github.com/zeromicro/go-queue@latest
三、项目演示
1.配置说明
type KqConf struct {service.ServiceConf// Brokers: Kafka 的多个 Broker 节点Brokers []string// Group: 消费者组Group string// Topic: 订阅的 Topic 主题Topic string// Offset: 如果新的 topic Kafka 没有对应的 offset 信息,或者当前的 offset 无效(历史数据被删除),// 需要指定从头(first)消费还是从尾(last)消费Offset string `json:",options=first|last,default=last"`// Conns: 一个 Kafka queue 对应可对应多个 consumer,Conns 对应 Kafka queue 数量,// 可以同时初始化多个 Kafka queue,默认只启动一个Conns int `json:",default=1"`// Consumers: go-queue 内部起多个 goroutine 从 Kafka 中获取信息写入进程内的 channel,// 此参数控制 goroutine 数量(⚠️ 并不是真正消费时的并发 goroutine 数量)Consumers int `json:",default=8"`// Processors: 当 Consumers 中的多个 goroutine 拉取到 Kafka 消息后,// 通过此参数控制当前消费逻辑的并发 goroutine 数量Processors int `json:",default=8"`// MinBytes: fetch 一次返回的最小字节数,若不够该字节数则会等待MinBytes int `json:",default=10240"` // 10K// MaxBytes: fetch 一次返回的最大字节数,若第一条消息大小超过该限制仍会继续拉取,// 以确保 consumer 的正常运行。并非绝对配置,消息大小也受 broker 的 message.max.bytes 限制,// 以及 topic 的 max.message.bytes 限制MaxBytes int `json:",default=10485760"` // 10M// Username: Kafka 的账号(可选)Username string `json:",optional"`// Password: Kafka 的密码(可选)Password string `json:",optional"`
}
2.配置
配置文件
在yaml 配置文件中添加当前的 kafka 配置信息,我这里为了省事就都放在一个配置文件下了:
#....#生产者
KqPusherConf:Name: log-producerBrokers:- 127.0.0.1:9094Group: logs-groupTopic: topic-test
#消费者
KqConsumerConf:Name: log-consumerBrokers:- 127.0.0.1:9094Group: logs-groupTopic: topic-testOffset: lastConsumers: 8Processors: 8
config.go
在 internal/config 下的 config.go 中定义 go 映射的配置
type Config struct {/*.....*/KqPusherConf kq.KqConfKqConsumerConf kq.KqConf
}
svc注入
在 svc/serviceContext.go 中初始化 pusher 的 kq client
type ServiceContext struct {Config config.ConfigKqPusherClient *kq.Pusher}func NewServiceContext(c config.Config) *ServiceContext {return &ServiceContext{Config: c,KqPusherClient: kq.NewPusher(c.KqPusherConf.Brokers, c.KqPusherConf.Topic),}
}
3. 生产者
在 logic 中写业务逻辑使用 go-queue 的 kq client 发送消息到 kafka,这里我们用登录作为演示,当登录成功后,发送用户信息:
func (l *LoginLogic) Login(req *types.LoginRequest) (resp *types.LoginResponse, err error) {// todo: add your logic here and delete this line/*....省略其他代码*///生产者需要异步执行,threading.GoSafe() 是go zero官方对 go func() 的安全封装threading.GoSafe(func() {logData := map[string]any{"user": user.Username,"mobile": user.Mobile,}logs, _ := json.Marshal(logData)// 使用Push推送消息,消息为jsonerr := l.svcCtx.KqPusherClient.Push(l.ctx, string(logs))if err != nil {logx.Errorf("KqPusherClient Push Error , err :%v", err)}})// 如果既没有验证码也没有密码return nil, errors.New(10010, "未提供有效的登录凭证")
}
生产者需要异步执行,threading.GoSafe() 是go zero官方对 go func() 的安全封装,如果出现panics 会自动恢复。
4. 消费者
在 internal 下新建一个 mq 文件夹,在 mq 文件夹下新建一个消费者 consumer.go:
package mqsimport ("beyond/user/api/internal/svc""context""fmt""github.com/zeromicro/go-zero/core/logc""github.com/zeromicro/go-zero/core/logx"
)//定义日志消费者
type LogsConsumer struct {ctx context.ContextsvcCtx *svc.ServiceContext
}// 定义构造方法
func NewLogsConsumer(ctx context.Context, svcCtx *svc.ServiceContext) *LogsConsumer {return &LogsConsumer{ctx: ctx,svcCtx: svcCtx,}
}// Consume为go zero内置接口, 实现Consume接口方法
func (l *LogsConsumer) Consume(ctx context.Context, key, val string) error {//logx.Infof("Consumer key :%s , val :%s", key, val)logc.Infof(ctx, "Consumer key :%s, val :%s", key, val)return nil
}
Consume 为go queue内置接口

因为消费者可能有多个,在 mq 文件夹下新建一个文件mqs.go用来监听多个消费者,mqs.go 代码如下:
package mqsimport ("beyond/user/api/internal/config""beyond/user/api/internal/svc""context""github.com/zeromicro/go-queue/kq""github.com/zeromicro/go-zero/core/service"
)func Consumers(c config.Config, ctx context.Context, svcCtx *svc.ServiceContext) []service.Service {// 监听消费者状态变化return []service.Service{//创建消息队列kq.MustNewQueue(c.KqConsumerConf, NewLogsConsumer(ctx, svcCtx)),}}
在 main.go 中启动 consumers 等待消费
func main() {flag.Parse()var c config.Configconf.MustLoad(*configFile, &c)server := rest.MustNewServer(c.RestConf)defer server.Stop()svcCtx := svc.NewServiceContext(c)handler.RegisterHandlers(server, svcCtx)fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)// 因为现在添加了mq,属于多服务状态,所以需要启动mq服务//server.Start() //创建新的服务组serviceGroup := service.NewServiceGroup()defer serviceGroup.Stop()// 从mq中获取消费者服务,并添加到服务组中for _, mq := range mqs.Consumers(c, context.Background(), svcCtx) {serviceGroup.Add(mq)}//添加原来的server服务serviceGroup.Add(server)// 启动服务组serviceGroup.Start()}
5.启动项目
我们这里就是简单的输出日志,你也可以拓展成发送邮件或者短信给用户,提示用户注册成功。

相关文章:
go-zero(十二)消息队列
go zero 消息队列 在微服务架构中,消息队列主要通过异步通信实现服务间的解耦,使得各个服务可以独立发展和扩展。 go-zero中使用的队列组件go-queue,是gozero官方实现的基于Kafka和Beanstalkd 的消息队列框架,我们使用kafka作为演示。 一、…...
会议通知:人工智能通识教育与实践发展暨和鲸科技AI通识课解决方案发布会
今年秋季学期起,全国多所高校面向本科生开设人工智能通识课。 当前人工智能通识课程的建设进展主要分为三种情况: 全市统筹,由某头部高校牵头建设市级人工智能通识课,以北京市、天津市为代表; 已于秋季学期按照课程…...
UDS自动化测试-Service 0x27(CAPL调用dll实现key计算)
文章目录 关联文章一、CANoe加载诊断数据库cdd、dll文件二、CAPLdiagGenerateKeyFromSeed关联文章 UDS - 深论Security Access Service 27服务-安全访问状态转换 CDD文件——CANdelaStudio Vector——CAPL语言设计 CANoe诊断测试 相信读者基于Diagnostic/ISO TP Confighratio…...
订单编号如何实现
背景 常见的订单编号是带有一些信息的,比如说创建日期例如:本案例中的订单日期 自增编号日期可以使用格式化字符串,自增则可以使用redis来实现 代码实现 redis就有自增的方法 每天的key都是不一样的,且过期时间设置为1天 // 生成…...
Vue3 大事件管理系统
Vue3 项目实战: 🆗好久没有更新blog,最近在找工作,还有准备考试,哎,😶🌫️爆炸的大环境🥲 内卷开始🌯🌯 本篇文章涉及的技术栈速通链接&#x…...
IOS通过WDA自动化中遇到的问题
IOS自动化遇到的问题 搭建WDA环境中遇到的问题1、XCode unsupport iphone xxx.2、创建Bundle Identifier出现问题:Communication with Apple failed3、创建Bundle Identifier出现问题:Automatic signing failed \Signing certificate is invalid4、创建B…...
单独测试 pyautogui 的鼠标点击功能,确保它能够在当前环境中正常工作,鼠标自动点击的录制回放功能
感谢您提供的详细日志信息。根据您的反馈,问题可能出在 pyautogui 没有正确获取鼠标焦点或无法在预期的位置执行点击操作。我们将采取以下步骤来进一步诊断和解决这个问题: 1. **确保 pyautogui 正确执行点击操作**: - 我们将添加更多的调…...
路由引入问题(双点双向路由回馈问题)
简介 总所周知,路由引入import又称路由重分发redistribute,为了解决不同路由协议进程间路由信息不互通而使用的技术,由于不同路由协议的算法、机制、开销等因素的差异,它们之间无法直接交换路由信息。因此,路由引入技…...
设计模式之 适配器模式 C# 范例
在 C# 中,适配器模式(Adapter Pattern)是一种结构型设计模式,旨在将一个类的接口转换成客户端所期待的另一个接口。适配器模式允许你将现有的类包装起来,使其能够与其他接口兼容。 适配器模式的使用场景: …...
LabVIEW实现GPS通信
目录 1、GPS通信原理 2、硬件环境部署 3、程序架构 4、前面板设计 5、程序框图设计 6、测试验证 本专栏以LabVIEW为开发平台,讲解物联网通信组网原理与开发方法,覆盖RS232、TCP、MQTT、蓝牙、Wi-Fi、NB-IoT等协议。 结合实际案例,展示如何利用LabVIEW和常用模块实现物联网系…...
[leetcode100] 543. 二叉树的直径
https://leetcode.cn/problems/diameter-of-binary-tree/description/?envTypestudy-plan-v2&envIdtop-100-liked 题目描述:给一个二叉树,返回二叉树直径最大值。直径指的是二叉树中任意一个结点到另外一个结点产生路径的长度。而长度由边来代表。…...
嵌入式学习(18)-stm32F407串口接收空闲中断+DMA
一、概述 在一些一次性接收大批量数据的引用场合,如果使用接收中断会频繁的进入接收中断影响代码的运行效率。为了解决这个问题可以使用串口的空闲中断DMA实现。 二、应用 在网上招了一些例程在STM32F407的平台上都没有跑通会出现各种异常,主要原因还…...
b站视频爬虫-词云分析
一、设置爬虫程序 # requests 请求b站视频 import jsonimport fake_useragent import requests from lxml import etreeif __name__ == __main__:# UA伪装head = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like …...
如何防止订单二次重复支付?
如何防止订单二次重复支付? 在电商平台和支付系统中,防止订单二次重复支付是一个至关重要的功能。以下是一些常见的策略和技术手段,用于确保订单支付的幂等性和一致性。 目录 唯一订单号订单状态检查数据库事务乐观锁悲观锁支付渠道状态核查…...
LeetCode 24反转链表
单链表反转:详细解析与代码实现 在数据结构的学习过程中,链表是一个非常重要且有趣的部分,而单链表的反转操作更是常考的基础知识点。今天就来和大家详细讲讲如何实现单链表的反转,并通过代码示例来加深理解呀。 题目 给定单链…...
用python的flask写的一个MQTT中转功能,http的方式发送数据和接收数据
需求背景 给一个客户对接人脸识别的设备,最后需要通知服务端进行一些消息推送。 简单例子 # 作者 陈老师 # https://v.iiar.cn import json import paho.mqtt.client as mqtt import requests from flask import Flask, requestapp Flask(__name__)# MQTT配置 mq…...
img引入svg如何修改颜色
方法1:通过css中filter:drop-shadow 首先需要一个容纳图标的父盒子(下方实例中的.svg-img),通过css造一个图标的‘影子’(.svg-color中的drop-shadow),然后设置‘影子’的颜色,再把图标本体移出父盒子&…...
计算机毕业设计PySpark+PyFlink+Hive地震预测系统 地震数据分析可视化 地震爬虫 大数据毕业设计 Hadoop 机器学习 深度学习
温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 温馨提示:文末有 CSDN 平台官方提供的学长联系方式的名片! 作者简介:Java领…...
【Python】使用Numpy实现余弦相似度计算
本文详细介绍了如何使用 NumPy 实现两个向量之间的余弦相似度计算,帮助理解向量相似度在推荐系统、文本处理等领域的应用。 1. 余弦相似度定义 余弦相似度是衡量两个向量在高维空间中夹角大小的指标,其公式为: c o s ( θ ) A ⋅ B ∥ A ∥…...
nginx中的root和alias的区别
alias 在E:\\test\\目录下创建一个index.html文件 在nginx.conf文件配置alias,路径填写为绝对路径,但是要注意,这里结尾是文件夹的名字 然后下面的/aa/ 是随便起的名字,也不是文件夹的名字,在浏览器访问的使用的 在浏览器使用 …...
从WWDC看苹果产品发展的规律
WWDC 是苹果公司一年一度面向全球开发者的盛会,其主题演讲展现了苹果在产品设计、技术路线、用户体验和生态系统构建上的核心理念与演进脉络。我们借助 ChatGPT Deep Research 工具,对过去十年 WWDC 主题演讲内容进行了系统化分析,形成了这份…...
Mybatis逆向工程,动态创建实体类、条件扩展类、Mapper接口、Mapper.xml映射文件
今天呢,博主的学习进度也是步入了Java Mybatis 框架,目前正在逐步杨帆旗航。 那么接下来就给大家出一期有关 Mybatis 逆向工程的教学,希望能对大家有所帮助,也特别欢迎大家指点不足之处,小生很乐意接受正确的建议&…...
【网络安全产品大调研系列】2. 体验漏洞扫描
前言 2023 年漏洞扫描服务市场规模预计为 3.06(十亿美元)。漏洞扫描服务市场行业预计将从 2024 年的 3.48(十亿美元)增长到 2032 年的 9.54(十亿美元)。预测期内漏洞扫描服务市场 CAGR(增长率&…...
ESP32读取DHT11温湿度数据
芯片:ESP32 环境:Arduino 一、安装DHT11传感器库 红框的库,别安装错了 二、代码 注意,DATA口要连接在D15上 #include "DHT.h" // 包含DHT库#define DHTPIN 15 // 定义DHT11数据引脚连接到ESP32的GPIO15 #define D…...
React Native在HarmonyOS 5.0阅读类应用开发中的实践
一、技术选型背景 随着HarmonyOS 5.0对Web兼容层的增强,React Native作为跨平台框架可通过重新编译ArkTS组件实现85%以上的代码复用率。阅读类应用具有UI复杂度低、数据流清晰的特点。 二、核心实现方案 1. 环境配置 (1)使用React Native…...
【项目实战】通过多模态+LangGraph实现PPT生成助手
PPT自动生成系统 基于LangGraph的PPT自动生成系统,可以将Markdown文档自动转换为PPT演示文稿。 功能特点 Markdown解析:自动解析Markdown文档结构PPT模板分析:分析PPT模板的布局和风格智能布局决策:匹配内容与合适的PPT布局自动…...
Axios请求超时重发机制
Axios 超时重新请求实现方案 在 Axios 中实现超时重新请求可以通过以下几种方式: 1. 使用拦截器实现自动重试 import axios from axios;// 创建axios实例 const instance axios.create();// 设置超时时间 instance.defaults.timeout 5000;// 最大重试次数 cons…...
RNN避坑指南:从数学推导到LSTM/GRU工业级部署实战流程
本文较长,建议点赞收藏,以免遗失。更多AI大模型应用开发学习视频及资料,尽在聚客AI学院。 本文全面剖析RNN核心原理,深入讲解梯度消失/爆炸问题,并通过LSTM/GRU结构实现解决方案,提供时间序列预测和文本生成…...
听写流程自动化实践,轻量级教育辅助
随着智能教育工具的发展,越来越多的传统学习方式正在被数字化、自动化所优化。听写作为语文、英语等学科中重要的基础训练形式,也迎来了更高效的解决方案。 这是一款轻量但功能强大的听写辅助工具。它是基于本地词库与可选在线语音引擎构建,…...
Python ROS2【机器人中间件框架】 简介
销量过万TEEIS德国护膝夏天用薄款 优惠券冠生园 百花蜂蜜428g 挤压瓶纯蜂蜜巨奇严选 鞋子除臭剂360ml 多芬身体磨砂膏280g健70%-75%酒精消毒棉片湿巾1418cm 80片/袋3袋大包清洁食品用消毒 优惠券AIMORNY52朵红玫瑰永生香皂花同城配送非鲜花七夕情人节生日礼物送女友 热卖妙洁棉…...
