当前位置: 首页 > news >正文

在Go中迅速使用RabbitMQ

文章目录

  • 1 认识
    • 1.1 MQ分类
    • 1.2 安装
    • 1.3 基本流程
  • 2 [Work模型](https://www.rabbitmq.com/tutorials/tutorial-two-go#preparation)
  • 3 交换机
    • 3.1 fanout
    • 3.2 direct
    • 3.3 [topic](https://www.rabbitmq.com/tutorials/tutorial-five-go)
  • 4 Golang创建交换机/队列/Publish/Consume/Bind
  • 5 可靠性
    • 5.1 生产者可靠性
    • 5.2 MQ可靠性
      • 5.2.1 Lazy Queue
    • 5.3 消费者可靠性
    • 5.4 业务幂等性
    • 5.4 Golang实现可靠性
      • 1. 确保消息生产者的可靠性
      • 2. 确保消息队列的可靠性
      • 3. 确保消息消费者的可靠性
      • 4. 容错处理
  • 6 延迟消息
    • 6.1 死信交换机
    • 6.2 延迟消息插件
      • 6.2.1 安装
      • 6.2.2 使用
      • 6.2.3 应用场景

  • 为什么要使用消息队列

image-20240903160417835

1 认识

1.1 MQ分类

  • 有Broker

    • 重Topic —— 在整个broker中,依据topic来进行消息中转。在重topic的MQ中必然需要topic —— kafka
    • 轻Topic —— topic只是一种中转模式 —— rabbitMQ
  • 无Broker

1.2 安装

# latest RabbitMQ 3.13
docker run \-e RABBITMQ_DEFAULT_USER=dusong \  #默认账号和密码均为:guest-e RABBITMQ_DEFAULT_PASS=123123 \-d \  #detached mode-v mq-plugins:/plugins \   #插件挂载--rm \--name rabbitmq \-p 5672:5672 \    #消息通信端口-p 15672:15672 \  #管理界面端口rabbitmq:3.13-management

1.3 基本流程

image-20240904110326213

  • exchange只能转发消息,不能存储消息
  • 通过bind将queue绑定到exchange

2 Work模型

  • 多个消费者绑定到一个队列

  • 同一个消息只会被一个消费者处理

  • 通过设置prefetch来控制消费者预取的消息数量(不设置默认平均平均分配)

    image-20240904144344585

    err = ch.Qos(1,     // prefetch count0,     // prefetch sizefalse, // global
    )
    

3 交换机

3.1 fanout

fanout类型的交换机会将消息转发给所有绑定到改交换机的队列

3.2 direct

image-20240904151234823

err = ch.ExchangeDeclare("logs_direct", // name"direct",      // typetrue,          // durablefalse,         // auto-deletedfalse,         // internalfalse,         // no-waitnil,           // arguments
)
failOnError(err, "Failed to declare an exchange")ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()body := bodyFrom(os.Args)
err = ch.PublishWithContext(ctx,"logs_direct",         // exchange"log", // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(body),
})

3.3 topic

image-20240904151944421

4 Golang创建交换机/队列/Publish/Consume/Bind

  • 创建交换机

    err = ch.ExchangeDeclare("logs_direct", // name"direct",      // typetrue,          // durablefalse,         // auto-deletedfalse,         // internalfalse,         // no-waitnil,           // arguments
    )
    
  • 创建队列

    q, err := ch.QueueDeclare("hello", // namefalse,   // durable(是否持久化)false,   // delete when unusedfalse,   // exclusivefalse,   // no-waitnil,     // arguments
    )
    
  • 绑定

    err = ch.QueueBind(q.Name,        // queue name"log",             // routing key"logs_direct", // exchangefalse,nil
    )
    
  • 发送

    body := "this is log"
    err = ch.PublishWithContext(ctx,"logs_direct",         // exchange"log", // routing keyfalse,                 // mandatoryfalse,                 // immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(body),})
    
  • 接收

    msgs, err := ch.Consume(q.Name, // queue"",     // consumertrue,   // auto ackfalse,  // exclusivefalse,  // no localfalse,  // no waitnil,    // args
    )
    

5 可靠性

5.1 生产者可靠性

  • 生产者重连

  • 生产者确认(ack)

5.2 MQ可靠性

  • 交换机/队列持久化
  • 消息持久化

5.2.1 Lazy Queue

image-20240904172117264

image-20240904163818387

5.3 消费者可靠性

  • 消费者确认机制

    image-20240904172521990

5.4 业务幂等性

  • 消费者因为保证可靠性可能消费业务多次,因此需要保证业务幂等性
  1. 给消息加上uuid
  2. 在业务逻辑上做修改

5.4 Golang实现可靠性

在使用 RabbitMQ 的 Go 应用程序中,要确保消息的可靠性,通常需要从以下几个方面入手:

1. 确保消息生产者的可靠性

  • 消息确认(Publisher Confirms): 开启 RabbitMQ 的发布确认模式。通过调用 Channel.Confirm() 方法,让 RabbitMQ 服务器在成功接收并持久化消息后向生产者发送确认。这样可以确保生产者知道消息已被可靠接收。

    ch.Confirm(false) // 启用发布确认模式
    confirm := ch.NotifyPublish(make(chan amqp.Confirmation, 1))// 发布消息
    err = ch.Publish(exchange, routingKey, mandatory, immediate, msg)
    if err != nil {// 处理发布失败的情况
    }select {
    case confirmed := <-confirm:if confirmed.Ack {fmt.Println("消息已确认")} else {fmt.Println("消息未确认")}
    case <-time.After(time.Second * 5):fmt.Println("消息确认超时")
    }
    
  • 消息持久化(Message Durability): 将消息标记为持久化,以确保即使 RabbitMQ 服务器重启,消息也不会丢失。通过设置 DeliveryModeamqp.Persistent 来实现:

    msg := amqp.Publishing{DeliveryMode: amqp.Persistent,ContentType:  "text/plain",Body:         []byte("Hello, RabbitMQ!"),
    }
    

2. 确保消息队列的可靠性

  • 队列持久化(Queue Durability): 创建队列时,将其声明为持久化队列。这样即使 RabbitMQ 服务器重启,队列依然存在。

    _, err = ch.QueueDeclare("my_queue",  // 队列名true,        // 是否持久化false,       // 是否自动删除false,       // 是否排他false,       // 是否阻塞nil,         // 其他参数
    )
    if err != nil {log.Fatalf("Failed to declare a queue: %s", err)
    }
    

3. 确保消息消费者的可靠性

  • 手动确认(Manual Acknowledgment): 消费者手动确认接收到的消息。这样只有在消息成功处理后,RabbitMQ 才会将其从队列中移除。如果消费者没有确认消息且发生故障,RabbitMQ 会将消息重新投递。

    msgs, err := ch.Consume("my_queue", // 队列名"",         // 消费者标识false,      // 自动确认false,      // 是否排他false,      // 是否阻塞false,      // 是否在同一个连接上消费nil,        // 其他参数
    )
    if err != nil {log.Fatalf("Failed to register a consumer: %s", err)
    }for d := range msgs {// 处理消息fmt.Printf("Received a message: %s", d.Body)// 手动确认d.Ack(false)
    }
    
  • QoS(Quality of Service): 设置消费者的 QoS 参数,例如 prefetch_count,确保消费者不会一次处理太多消息,从而导致过载。

    err = ch.Qos(1,    // 每次处理一条消息0,    // 消息大小限制(不限制)false, // 是否应用于整个通道
    )
    if err != nil {log.Fatalf("Failed to set QoS: %s", err)
    }
    

4. 容错处理

  • 重试机制: 在生产者和消费者中实现重试机制,例如使用带有指数回退的重试逻辑,以应对 RabbitMQ 不可用或网络波动的情况。

  • 死信队列(DLX): 配置死信队列,将处理失败的消息路由到指定的死信队列,方便后续分析和处理。

通过这些措施,可以有效提高使用 RabbitMQ 时的消息可靠性。

6 延迟消息

6.1 死信交换机

image-20240905145924200

6.2 延迟消息插件

6.2.1 安装

  1. https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez

  2. 将插件放在该目录

    image-20240905153455222

  3. docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq-delayed-message-exchange

6.2.2 使用

 // 3. 声明延迟交换机err = ch.ExchangeDeclare("delay_exchange",               // 交换机名称"x-delayed-message",            // 交换机类型true,                           // 是否持久化false,                          // 是否自动删除false,                          // 是否内部使用false,                          // 是否等待amqp.Table{"x-delayed-type": "direct"}, // 交换机类型的设置)failOnError(err, "Failed to declare an exchange")// 4. 发送消息body := "Hello World with delay"err = ch.Publish("delay_exchange", // 交换机名称"routing_key",    // 路由键false,            // 是否强制发送false,            // 是否立即发送amqp.Publishing{ContentType: "text/plain",Body:        []byte(body),Headers: amqp.Table{"x-delay": int32(5000), // 延迟时间,单位为毫秒 (5秒延迟)},})

6.2.3 应用场景

  • 消息内部维护一个计时器,延迟消息对CPU的消耗较高,适用于延迟时间较短的场景

image-20240905155732697
false, // 是否立即发送
amqp.Publishing{
ContentType: “text/plain”,
Body: []byte(body),
Headers: amqp.Table{
“x-delay”: int32(5000), // 延迟时间,单位为毫秒 (5秒延迟)
},
})


### 6.2.3 应用场景- 消息内部维护一个计时器,延迟消息对CPU的消耗较高,适用于延迟时间较短的场景[外链图片转存中...(img-eA0QMPnx-1725527666228)]

相关文章:

在Go中迅速使用RabbitMQ

文章目录 1 认识1.1 MQ分类1.2 安装1.3 基本流程 2 [Work模型](https://www.rabbitmq.com/tutorials/tutorial-two-go#preparation)3 交换机3.1 fanout3.2 direct3.3 [topic](https://www.rabbitmq.com/tutorials/tutorial-five-go) 4 Golang创建交换机/队列/Publish/Consume/B…...

Windows JDK安装详细教程

一、关于JDK 1.1 简介 Java是一种广泛使用的计算机编程语言&#xff0c;拥有跨平台、面向对象、泛型编程的特性&#xff0c;广泛应用于企业级Web应用开发和移动应用开发。 JDK&#xff08;Java Development Kit&#xff09;是用于开发 Java 应用程序的工具包。它由以下几个主要…...

Ribbon负载均衡底层原理

springcloude服务实例与服务实例之间发送请求&#xff0c;首先根据服务名注册到nacos&#xff0c;然后发送请求&#xff0c;nacos可以根据服务名找到对应的服务实例。 SpringCloudRibbon的底层采用了一个拦截器&#xff0c;拦截了openfeign发出的请求&#xff0c;对地址做了修…...

【C语言可变参数函数的使用与原理分析】

文章目录 1 前言2 实例2.1实例程序2.2程序执行结果2.3 程序分析 3 补充4 总结 1 前言 在编程过程中&#xff0c;有时会遇到需要定义参数数量不固定的函数的情况。 C语言提供了一种灵活的解决方案&#xff1a;变参函数。这种函数能够根据实际调用时的需求&#xff0c;接受任意…...

【笔记】Java EE应用开发环境配置(JDK+Maven+Tomcat+MySQL+IDEA)

一、安装JDK17 1.下载JDK17 https://download.oracle.com/java/17/archive/jdk-17.0.7_windows-x64_bin.zip 2.配置环境变量 下载后&#xff0c;解压到本地&#xff08;目录中最好不要有中文或特殊字符&#xff09; 打开【控制面板】-【系统和安全】-【系统】-【高级系统…...

一文讲懂扩散模型

一文讲懂扩散模型 扩散模型&#xff08;Diffusion Models, DM&#xff09;是近年来在计算机视觉、自然语言处理等领域取得显著进展的一种生成模型。其思想根源可以追溯到非平衡热力学&#xff0c;通过模拟数据的扩散和去噪过程来生成新的样本。以下将详细阐述扩散模型的基本原理…...

学习笔记八:基于Jenkins+k8s+Git+DockerHub等技术链构建企业级DevOps容器云平台

基于Jenkinsk8sGitDockerHub等技术链构建企业级DevOps容器云平台 测试jenkins的CI/CD在Jenkins中安装kubernetes插件安装blueocean插件配置jenkins连接到我们存在的k8s集群配置pod-template添加自己的dockerhub凭据测试通过Jenkins部署应用发布到k8s开发环境、测试环境、生产环…...

科研绘图系列:R语言柱状图分布(histogram plot)

文章目录 介绍加载R包读取数据画图介绍 柱状图(Bar Chart)是一种常用的数据可视化图表,用于展示和比较不同类别或组的数据。它通过在二维平面上绘制一系列垂直或水平的柱子来表示数据的大小,每个柱子的长度或高度代表一个数据点的数值。柱状图非常适合于展示分类数据的分布…...

vue3+ts封装类似于微信消息的组件

组件代码如下&#xff1a; <template><div:class"[voice-message, { sent: isSent, received: !isSent }]":style"{ backgroundColor: backgroundColor }"click"togglePlayback"><!-- isSent为false在左侧&#xff0c;为true在右…...

ES6 reduce方法详解:示例、应用场景与实用技巧

在JavaScript中&#xff0c;reduce 方法是一个非常强大的数组方法&#xff0c;它允许你将数组中的元素归并&#xff08;reduce&#xff09;为单个值。reduce 方法执行一个由你提供的reducer函数&#xff08;归并函数&#xff09;&#xff0c;将其结果汇总为单一的返回值。 一.…...

java后端保存的本地图片通过ip+端口直接访问

直接上代码吧 package com.ydx.emms.datapro.controller;import org.springframework.context.annotation.Configuration; import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry; import org.springframework.web.servlet.config.annotation.…...

2024 年高教社杯全国大学生数学建模竞赛B题4小问解题思路(第二版)

原文链接&#xff1a;https://www.cnblogs.com/qimoxuan/articles/18399415 问题 1&#xff1a;抽样检测方案设计 详细解题思路&#xff1a; 确定抽样检测目标&#xff1a;企业需要确定一个可接受的次品率上限&#xff08;标称值&#xff09;&#xff0c;以及在该次品率下&am…...

docker-nginx数据卷挂载

一、案例1-利用Nginx容器部署静态资源 1.1、需求: 创建Nginx容器&#xff0c; 修改nginx容器内的html目录下的index.html文件,查看变化将静态资源部署到nginx的html目录 1.2、修改html目录下的index.html文件,查看变化 因为docker运用得最小化系统环境&#xff0c;解决办法就…...

项目实战 ---- 商用落地视频搜索系统(8)---优化(2)---查询逻辑层优化

目录 背景 技术衡量与方案 一种可实现方案 可实现方案及设计描述 可能存在的问题 一种创新实现方案 方案的改良设计 策略公式 优化的实现 完整代码 代码解释 异常场景的考量 处理方式 运行注意事项 运行结果 结果优化对比与解释 背景 在项目实战 ---- 商用落地…...

山东大学机试试题合集

&#x1f370;&#x1f370;&#x1f370;高分篇已经涵盖了绝大多数的机试考点&#xff0c;由于临近预推免&#xff0c;各校的机试蜂拥而至&#xff0c;我们接下来先更一些各高校机试题合集&#xff0c;算是对前边学习成果的深入学习&#xff0c;也是对我们代码能力的锻炼。加油…...

餐厅食品留样管理系统小程序的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;窗口负责人管理&#xff0c;窗口员工管理&#xff0c;冰柜管理&#xff0c;排班信息管理&#xff0c;留样食品管理&#xff0c;教育宣传管理&#xff0c;系统管理 微信端账号功能包括&#xff1a;系统…...

亚马逊运营:如何提高亚马逊销量和运营效率?

不少亚马逊卖家们为了扩大业务规模和提高销量&#xff0c;会创建多个卖家账户来同时运营多个亚马逊店铺。问题是&#xff0c;这种多店铺运营模式并非没有风险——亚马逊运营的一个重要方面就是账户的健康管理。一旦某个账户出现问题&#xff0c;亚马逊的算法就可能会启动关联检…...

设计模式背后的设计原则和思想

设计模式背后的设计原则和思想是一套指导我们如何设计高质量软件系统的准则和方法论。这些原则和思想不仅有助于提升软件的可维护性、可扩展性和可复用性&#xff0c;还能帮助开发团队更好地应对复杂多变的需求。以下是一些核心的设计原则和思想&#xff1a; 1. 设计原则 设计…...

项目总体框架

一.后端&#xff08;包装servlet&#xff09; 使用BaseServlet进行请求的初步处理&#xff08;利用继承进行执行这个&#xff09; 在BaseServlet中 处理请求的类型找到对象的方法&#xff0c;并使用注解找到参数名&#xff0c;执行参数自动注入。 package com.csdn.controlle…...

k8s Prometheus

一、部署 Prometheus kubectl create ns kube-ops# 创建 prometheus-cm.yaml apiVersion: v1 kind: ConfigMap metadata:name: prometheus-confignamespace: kube-ops data:prometheus.yml: |global:scrape_interval: 15s # 表示 prometheus 抓取指标数据的频率&#xff0c;默…...

uniapp 对接腾讯云IM群组成员管理(增删改查)

UniApp 实战&#xff1a;腾讯云IM群组成员管理&#xff08;增删改查&#xff09; 一、前言 在社交类App开发中&#xff0c;群组成员管理是核心功能之一。本文将基于UniApp框架&#xff0c;结合腾讯云IM SDK&#xff0c;详细讲解如何实现群组成员的增删改查全流程。 权限校验…...

XCTF-web-easyupload

试了试php&#xff0c;php7&#xff0c;pht&#xff0c;phtml等&#xff0c;都没有用 尝试.user.ini 抓包修改将.user.ini修改为jpg图片 在上传一个123.jpg 用蚁剑连接&#xff0c;得到flag...

《从零掌握MIPI CSI-2: 协议精解与FPGA摄像头开发实战》-- CSI-2 协议详细解析 (一)

CSI-2 协议详细解析 (一&#xff09; 1. CSI-2层定义&#xff08;CSI-2 Layer Definitions&#xff09; 分层结构 &#xff1a;CSI-2协议分为6层&#xff1a; 物理层&#xff08;PHY Layer&#xff09; &#xff1a; 定义电气特性、时钟机制和传输介质&#xff08;导线&#…...

Cinnamon修改面板小工具图标

Cinnamon开始菜单-CSDN博客 设置模块都是做好的&#xff0c;比GNOME简单得多&#xff01; 在 applet.js 里增加 const Settings imports.ui.settings;this.settings new Settings.AppletSettings(this, HTYMenusonichy, instance_id); this.settings.bind(menu-icon, menu…...

苍穹外卖--缓存菜品

1.问题说明 用户端小程序展示的菜品数据都是通过查询数据库获得&#xff0c;如果用户端访问量比较大&#xff0c;数据库访问压力随之增大 2.实现思路 通过Redis来缓存菜品数据&#xff0c;减少数据库查询操作。 缓存逻辑分析&#xff1a; ①每个分类下的菜品保持一份缓存数据…...

论文浅尝 | 基于判别指令微调生成式大语言模型的知识图谱补全方法(ISWC2024)

笔记整理&#xff1a;刘治强&#xff0c;浙江大学硕士生&#xff0c;研究方向为知识图谱表示学习&#xff0c;大语言模型 论文链接&#xff1a;http://arxiv.org/abs/2407.16127 发表会议&#xff1a;ISWC 2024 1. 动机 传统的知识图谱补全&#xff08;KGC&#xff09;模型通过…...

WEB3全栈开发——面试专业技能点P2智能合约开发(Solidity)

一、Solidity合约开发 下面是 Solidity 合约开发 的概念、代码示例及讲解&#xff0c;适合用作学习或写简历项目背景说明。 &#x1f9e0; 一、概念简介&#xff1a;Solidity 合约开发 Solidity 是一种专门为 以太坊&#xff08;Ethereum&#xff09;平台编写智能合约的高级编…...

站群服务器的应用场景都有哪些?

站群服务器主要是为了多个网站的托管和管理所设计的&#xff0c;可以通过集中管理和高效资源的分配&#xff0c;来支持多个独立的网站同时运行&#xff0c;让每一个网站都可以分配到独立的IP地址&#xff0c;避免出现IP关联的风险&#xff0c;用户还可以通过控制面板进行管理功…...

API网关Kong的鉴权与限流:高并发场景下的核心实践

&#x1f525;「炎码工坊」技术弹药已装填&#xff01; 点击关注 → 解锁工业级干货【工具实测|项目避坑|源码燃烧指南】 引言 在微服务架构中&#xff0c;API网关承担着流量调度、安全防护和协议转换的核心职责。作为云原生时代的代表性网关&#xff0c;Kong凭借其插件化架构…...

Sklearn 机器学习 缺失值处理 获取填充失值的统计值

💖亲爱的技术爱好者们,热烈欢迎来到 Kant2048 的博客!我是 Thomas Kant,很开心能在CSDN上与你们相遇~💖 本博客的精华专栏: 【自动化测试】 【测试经验】 【人工智能】 【Python】 使用 Scikit-learn 处理缺失值并提取填充统计信息的完整指南 在机器学习项目中,数据清…...