当前位置: 首页 > news >正文

golang如何使用rocketmq 附加闭坑指南 建议收藏!!!

文章目录

  • 前言
  • 一、rocketmq是什么?
  • 二、rocketmq核心概念
  • 三、rocketmq核心应用
  • 四、go如何使用rocketmq
  • 总结


前言

当我们的业务达到一定规模,很多业务需要解耦,以及需要流量削峰的时候,我们需要使用MQ来让我们系统能够正常运转。

一、rocketmq是什么?

RocketMQ是由阿里捐赠给Apache的一款低延迟、高并发、高可用、高可靠的分布式消息中间件。经历了淘宝双十一的洗礼。RocketMQ既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。

二、rocketmq核心概念

  • Topic:消息主题,一级消息类型,生产者向其发送消息。
  • Message:生产者向Topic发送并最终传送给消费者的数据消息的载体。
  • 消息属性:生产者可以为消息定义的属性,包含Message Key和Tag。
  • Message Key:消息的业务标识,由消息生产者(Producer)设置,唯一标识某个业务逻辑。
  • Message ID:消息的全局唯一标识,由消息队列RocketMQ系统自动生成,唯一标识某条消息。
  • Tag:消息标签,二级消息类型,用来进一步区分某个Topic下的消息分类
  • Producer:也称为消息发布者,负责生产并发送消息至Topic。
  • Consumer:也称为消息订阅者,负责从Topic接收并消费消息。
  • 分区:即Topic Partition,物理上的概念。每个Topic包含一个或多个分区。
  • 消费位点:每个Topic会有多个分区,每个分区会统计当前消息的总条数,这个称为最大位点MaxOffset;分区的起始位置对应的位置叫做起始位点MinOffset。
  • Group:一类生产者或消费者,这类生产者或消费者通常生产或消费同一类消息,且消息发布或订阅的逻辑一致。
  • Group ID:Group的标识。
  • 队列:个Topic下会由一到多个队列来存储消息。
  • Exactly-Once投递语义:Exactly-Once投递语义是指发送到消息系统的消息只能被Consumer处理且仅处理一次,即使Producer重试消息发送导致某消息重复投递,该消息在Consumer也只被消费一次。
    集群消费:一个Group ID所标识的所有Consumer平均分摊消费消息。例如某个Topic有9条消息,一个* Group ID有3个Consumer实例,那么在集群消费模式下每个实例平均分摊,只消费其中的3条消息。
  • 广播消费:一个Group ID所标识的所有Consumer都会各自消费某条消息一次。例如某个Topic有9条消息,一个Group ID有3个Consumer实例,那么在广播消费模式下每个实例都会各自消费9条消息。
  • 定时消息:Producer将消息发送到消息队列RocketMQ服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到Consumer进行消费,该消息即定时消息。
  • 延时消息:Producer将消息发送到消息队列RocketMQ服务端,但并不期望这条消息立马投递,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。
  • 事务消息:RocketMQ提供类似X/Open XA的分布事务功能,通过消息队列RocketMQ的事务消息能达到分布式事务的最终一致。
  • 顺序消息:RocketMQ提供的一种按照顺序进行发布和消费的消息类型,分为全局顺序消息和分区顺序消息。
  • 全局顺序消息:对于指定的一个Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。
  • 分区顺序消息:对于指定的一个Topic,所有消息根据Sharding Key进行区块分区。同一个分区内的消息按照严格的FIFO顺序进行发布和消费。Sharding Key是顺序消息中用来区分不同分区的关键字段,和普通消息的Message Key是完全不同的概念。
  • 消息堆积:Producer已经将消息发送到消息队列RocketMQ的服务端,但由于Consumer消费能力有限,未能在短时间内将所有消息正确消费掉,此时在消息队列RocketMQ的服务端保存着未被消费的消息,该状态即消息堆积。
  • 消息过滤:Consumer可以根据消息标签(Tag)对消息进行过滤,确保Consumer最终只接收被过滤后的消息类型。消息过滤在消息队列RocketMQ的服务端完成。
  • 消息轨迹:在一条消息从Producer发出到Consumer消费处理过程中,由各个相关节点的时间、地点等数据汇聚而成的完整链路信息。通过消息轨迹,您能清晰定位消息从Producer发出,经由消息队列RocketMQ服务端,投递给Consumer的完整链路,方便定位排查问题。
  • 重置消费位点:以时间轴为坐标,在消息持久化存储的时间范围内(默认3天),重新设置Consumer对已订阅的Topic的消费进度,设置完成后Consumer将接收设定时间点之后由Producer发送到消息队列RocketMQ服务端的消息。
  • 死信队列:死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列RocketMQ会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明Consumer在正常情况下无法正确地消费该消息。此时,消息队列RocketMQ不会立刻将消息丢弃,而是将这条消息发送到该Consumer对应的特殊队列中。 消息队列RocketMQ将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。

三、rocketmq核心应用

  • 削峰填谷:诸如秒杀、抢红包、企业开门红等大型活动时皆会带来较高的流量脉冲,或因没做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,消息队列RocketMQ可提供削峰填谷的服务来解决该问题。
  • 异步解耦:交易系统作为淘宝和天猫主站最核心的系统,每笔交易订单数据的产生会引起几百个下游业务系统的关注,包括物流、购物车、积分、流计算分析等等,整体业务系统庞大而且复杂,消息队列RocketMQ可实现异步通信和应用解耦,确保主站业务的连续性。
  • 顺序收发:细数日常中需要保证顺序的应用场景非常多,例如证券交易过程时间优先原则,交易系统中的订单创建、支付、退款等流程,航班中的旅客登机消息处理等等。与先进先出FIFO(First In First Out)原理类似,消息队列RocketMQ提供的顺序消息即保证消息FIFO。
    分布式事务一致性:交易系统、支付红包等场景需要确保数据的最终一致性,大量引入消息队列
  • RocketMQ的分布式事务,既可以实现系统之间的解耦,又可以保证最终的数据一致性。
  • 大数据分析:数据在“流动”中产生价值,传统数据分析大多是基于批量计算模型,而无法做到实时的数据分析,利用阿里云消息队列RocketMQ与流式计算引擎相结合,可以很方便的实现业务数据的实时分析。
  • 分布式缓存同步:天猫双11大促,各个分会场琳琅满目的商品需要实时感知价格变化,大量并发访问数据库导致会场页面响应时间长,集中式缓存因带宽瓶颈,限制了商品变更的访问流量,通过消息队列RocketMQ构建分布式缓存,实时通知商品数据的变化。

四、go如何使用rocketmq

  1. 导入rocketmq包
    go get github.com/apache/rocketmq-client-go/v2
  2. 初始化消费者
package rocketmqimport ("context""fmt""gin/common/alarm""gin/common/function"consumer2 "gin/config/extra/consumer""gin/lib""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/consumer""github.com/apache/rocketmq-client-go/v2/primitive""github.com/apache/rocketmq-client-go/v2/producer""github.com/apache/rocketmq-client-go/v2/rlog""github.com/tidwall/gjson""reflect""sync"
)var p rocketmq.Producerfunc InitProducer() {p, _ = rocketmq.NewProducer(producer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),producer.WithRetry(2),producer.WithGroupName("gin-hong-api"),)err := p.Start()if err != nil {fmt.Printf("start producer error: %s", err.Error())}
}func SendMessage(topic string, message string) {msg := &primitive.Message{Topic: topic,Body:  []byte(message),}res, err := p.SendSync(context.Background(), msg)if err != nil {fmt.Printf("send message error: %s\n", err)} else {fmt.Printf("send message success: result=%s\n", res.String())}
}

3 初始化消费者

func InitConsumer() {c, err := rocketmq2.NewPushConsumer(consumer.WithGroupName("gin-hong-api"),consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),)if err != nil {rlog.Fatal(fmt.Sprintf("fail to new pullConsumer: %s", err), nil)}wg := new(sync.WaitGroup)wg.Add(1)defer func() {if err := recover(); err != nil {errMsg := function.ErrorToString(err)trace := function.PrintStackTrace()alarm.DingDing(errMsg + trace)}}()c.Subscribe(s, consumer.MessageSelector{}, func(ctx context.Context,msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {for _, message := range msgs {msg := string(message.Body)fmt.Println(msg)}return consumer.ConsumeSuccess, nil})err := c.Start()if err != nil {rlog.Fatal(fmt.Sprintf("fail to new pullConsumer: %s", err), nil)}wg.Wait()
}
  1. 需要注意的点
  • 一个消费者组中的功能应该一致,防止遗漏消费
  • 一个Topic中的所有队列和一个消费者组中的消费者是多对一的关系,防止重复消费
  • 一个消费者组对应消费者集群,一个消费者对应一个进程。同组中的消费者可以是一个机器中的多个进程;也可以是多个机器中的进程(建立集群,防止宕机导致服务不可用)
  • 在一个进程中,可以并发运行不同消费者组的消费者实例,但不能并发运行同组的消费者线程。Go SDK内部也没有提供单例线程数的设置。
  • 一个消费者组可以订阅多个Topic
  • 所有消费者组由nameserve统一管理

总结

由于rocketmq go 包使用pullConsumer还没实现。所以目前只能使用Subscribe来消费。

相关文章:

golang如何使用rocketmq 附加闭坑指南 建议收藏!!!

文章目录前言一、rocketmq是什么?二、rocketmq核心概念三、rocketmq核心应用四、go如何使用rocketmq总结前言 当我们的业务达到一定规模,很多业务需要解耦,以及需要流量削峰的时候,我们需要使用MQ来让我们系统能够正常运转。 一…...

C++实现的二叉树创建和遍历,超入门邻家小女也懂了

目录 二叉树 特点 性质 二叉树的创建 声明 创建 -> 成员运算符 批量创建 二叉树的遍历 先序遍历 中序遍历 后序遍历 层序遍历 树的相关术语 特殊二叉树 满二叉树 完全二叉树 二叉树 树(Tree)是n(n≥0)个节点的有限集。在任意一棵…...

如何写出高质量的业务接口

清晰的需求 需求要有文档;方便后续追溯或交接等需求是基础,必须详细;多和需求沟通确认,不可模糊、模棱两可,否则后续可能越错越远 抽象建模 分析需求;梳理清楚关联关系,建立数据模型和关联画E-R…...

3.8多线程

案例一-线程安全的单例模式(面试)是一种设计模式,设计模式针对写代码时的一些常见场景给出一些经典解决方案单例模式的两种典型实现饿汉模式懒汉模式饿汉的单例模式:比较着急去进行创建实例懒汉的单例模式,是不太着急创建实例,,只是在用的时候,才真正创建这个是类对象,也就是.c…...

图文讲解MongoDB该怎么安装

一、安装前必读 我这里是Centos7 Linux 内核 注意:本文的命令使用的是 root 用户登录执行,不是 root 的话所有命令前面要加 sudo 二、环境配置 2.1 停止防火墙 systemctl status firewalld #查看firewall systemctl stop firewalld …...

「ML 实践篇」机器学习项目落地

文章目录1. 项目分析1. 框架问题2. 性能指标2. 获取数据1. 准备工作区2. 下载数据3. 查看数据4. 创建测试集3. 数据探索1. 地理位置可视化2. 寻找相关性3. 组合属性4. 数据准备1. 数据清理2. Scikit-Learn 的设计3. 处理文本、分类属性4. 自定义转换器5. 特征缩放6. 流水线5. 选…...

c++面试技巧-基础篇3

1.面试官:什么是函数的重载? 应聘者:函数的重载就是允许使用同一个函数名来定义多个函数,但是这些函数的参数个数和类型不同。 2.面试官:如何引用一个已经定义过的全局变量? 应聘者:可以用引…...

MySQL OCP888题解044-从服务器上导入mysql模式数据后的权限问题

文章目录1、原题1.1、英文原题1.2、中文翻译1.3、答案2、题目解析2.1、题干解析2.2、选项解析3、知识点3.1、知识点1:mysqldump的--flush-privileges选项3.2、知识点2:mysqldump的--all-databases选项3.3、知识点3:mysqldump默认不转储的内容…...

实战小项目之视频监控(1-2)

实战小项目之视频监控(1-2) Nginx 移植 前面也给大家提到了,我们可以使用 Nginx 来搭建 RTMP 流媒体服务器,譬如你可以在一台公网 IP 主 机上搭建流媒体服务器,当然,笔者并没有这个条件;这里我…...

人工智能基础--AI作业1-ML基础

1.监督学习和无监督学习都是机器学习中常用的方法。监督学习是一种机器学习方法,其中机器学习算法根据给定的输入数据和其对应的输出标签进行训练,以学习如何预测新的输入数据的输出标签。简单来说,监督学习就是通过已知的数据进行学习&#…...

关于JS中this对象指向问题总结

一、前言 关于JS中this对象指向问题,相信做过项目的小伙伴多多少少都会遇到过,明明感觉代码写的没问题,可是运行的时候,就会报错,比如报错 xxx is not a function。 我最近也遇到了,百度学习了不少前辈对于…...

Codeforces Round 855 (Div. 3) A-E2

比赛链接:Dashboard - Codeforces Round 855 (Div. 3) - Codeforces A:模拟 题意:给定一个字符串,问这个字符串是不是猫叫。定义是猫叫得字符串: 1:必须由大写或小写得M(m),E&…...

Spark Yarn 运行环境搭建

文章目录Spark Yarn 运行环境搭建1、解压缩文件2、修改配置环境文件3、配置历史服务器Spark Yarn 运行环境搭建 1、解压缩文件 将spark3.2.3的压缩包上传到 linux /opt/software 目录下 输入命令: tar -zxvf spark-3.2.3-bin-hadoop3.2-scala2.13.tgz -C /opt/ 解…...

SpringMVC 页面跳转指南:转发和重定向的实现与比较

SpringMVC 是一款非常流行的 Java Web 框架,它提供了丰富的特性和功能,使得开发者可以轻松地开发 Web 应用程序。其中,转发和重定向是 SpringMVC 中非常常见的两个操作,它们可以用于控制请求的流转和页面的跳转。本文将深入探讨 S…...

ModStartCMS v5.9.0 后台浅色模式,系统样式升级

ModStart 是一个基于 Laravel 模块化极速开发框架。模块市场拥有丰富的功能应用,支持后台一键快速安装,让开发者能快的实现业务功能开发。 系统完全开源,基于 Apache 2.0 开源协议,免费且不限制商业使用。 功能特性 丰富的模块市…...

2020蓝桥杯真题反倍数 C语言/C++

题目描述 给定三个整数 a,b,c,如果一个整数既不是 a 的整数倍也不是 b 的整数倍还不是 c 的整数倍,则这个数称为反倍数。 请问在 1 至 n 中有多少个反倍数。 输入描述 输入的第一行包含一个整数 n。 第二行包含三个整数a,b,c,相邻两个数之…...

PTA:L1-025 正整数A+B、L1-026 I Love GPLT、L1-027 出租(C++)

目录 L1-025 正整数AB 问题描述: 实现代码: L1-026 I Love GPLT 问题描述: 实现代码: L1-027 出租 问题描述: 实现代码: 原理思路: 出租那道题有点意思哈 L1-025 正整数AB 问题描述…...

状态机的Go语言实现版本

一、状态机 1. 定义 有限状态机(Finite-state machine, FSM),简称状态机,是表示有限个状态以及在这些状态之间的转移和动作等行为的数学模型。 2. 组成要素 现态(src state):事务当前所处的状…...

第2章 线程安全与共享资源竞争

第2章 线程安全与共享资源竞争 2.1 synchronized同步介绍 synchronized要解决的是共享资源冲突的问题。当共享资源被任务使用时,要对资源提前加锁。所有任务都采用抢占模式,即某个任务会抢先对共享资源加上第一把锁。如果这是一个排他锁,…...

77. writerows写入多行

文章目录1. 目标任务2. 准备工作3. writerow单行写入4. writerows多行写入5. a以追加的模式写入值6. 总结1. 目标任务 新建【各班级成绩】文件夹; 在该文件夹下新建一个【1班成绩单.csv】文件; 在该文件中写入下面的内容: 成绩 姓名 刘一…...

CTF show Web 红包题第六弹

提示 1.不是SQL注入 2.需要找关键源码 思路 进入页面发现是一个登录框,很难让人不联想到SQL注入,但提示都说了不是SQL注入,所以就不往这方面想了 ​ 先查看一下网页源码,发现一段JavaScript代码,有一个关键类ctfs…...

盘古信息PCB行业解决方案:以全域场景重构,激活智造新未来

一、破局:PCB行业的时代之问 在数字经济蓬勃发展的浪潮中,PCB(印制电路板)作为 “电子产品之母”,其重要性愈发凸显。随着 5G、人工智能等新兴技术的加速渗透,PCB行业面临着前所未有的挑战与机遇。产品迭代…...

PHP和Node.js哪个更爽?

先说结论,rust完胜。 php:laravel,swoole,webman,最开始在苏宁的时候写了几年php,当时觉得php真的是世界上最好的语言,因为当初活在舒适圈里,不愿意跳出来,就好比当初活在…...

家政维修平台实战20:权限设计

目录 1 获取工人信息2 搭建工人入口3 权限判断总结 目前我们已经搭建好了基础的用户体系,主要是分成几个表,用户表我们是记录用户的基础信息,包括手机、昵称、头像。而工人和员工各有各的表。那么就有一个问题,不同的角色&#xf…...

苍穹外卖--缓存菜品

1.问题说明 用户端小程序展示的菜品数据都是通过查询数据库获得,如果用户端访问量比较大,数据库访问压力随之增大 2.实现思路 通过Redis来缓存菜品数据,减少数据库查询操作。 缓存逻辑分析: ①每个分类下的菜品保持一份缓存数据…...

04-初识css

一、css样式引入 1.1.内部样式 <div style"width: 100px;"></div>1.2.外部样式 1.2.1.外部样式1 <style>.aa {width: 100px;} </style> <div class"aa"></div>1.2.2.外部样式2 <!-- rel内表面引入的是style样…...

工业自动化时代的精准装配革新:迁移科技3D视觉系统如何重塑机器人定位装配

AI3D视觉的工业赋能者 迁移科技成立于2017年&#xff0c;作为行业领先的3D工业相机及视觉系统供应商&#xff0c;累计完成数亿元融资。其核心技术覆盖硬件设计、算法优化及软件集成&#xff0c;通过稳定、易用、高回报的AI3D视觉系统&#xff0c;为汽车、新能源、金属制造等行…...

Android15默认授权浮窗权限

我们经常有那种需求&#xff0c;客户需要定制的apk集成在ROM中&#xff0c;并且默认授予其【显示在其他应用的上层】权限&#xff0c;也就是我们常说的浮窗权限&#xff0c;那么我们就可以通过以下方法在wms、ams等系统服务的systemReady()方法中调用即可实现预置应用默认授权浮…...

JVM暂停(Stop-The-World,STW)的原因分类及对应排查方案

JVM暂停(Stop-The-World,STW)的完整原因分类及对应排查方案,结合JVM运行机制和常见故障场景整理而成: 一、GC相关暂停​​ 1. ​​安全点(Safepoint)阻塞​​ ​​现象​​:JVM暂停但无GC日志,日志显示No GCs detected。​​原因​​:JVM等待所有线程进入安全点(如…...

Linux离线(zip方式)安装docker

目录 基础信息操作系统信息docker信息 安装实例安装步骤示例 遇到的问题问题1&#xff1a;修改默认工作路径启动失败问题2 找不到对应组 基础信息 操作系统信息 OS版本&#xff1a;CentOS 7 64位 内核版本&#xff1a;3.10.0 相关命令&#xff1a; uname -rcat /etc/os-rele…...