RabbitMQ-默认读、写方式介绍
1、RabbitMQ简介
rabbitmq是一个开源的消息中间件,主要有以下用途,分别是:
- 应用解耦:通过使用RabbitMQ,不同的应用程序之间可以通过消息进行通信,从而降低应用程序之间的直接依赖性,提高系统的可维护性、扩展性和容错性。
- 异步提速:通过将耗时的操作转化为异步执行,可以提高系统的响应速度和吞吐量,提升用户体验。
- 削峰填谷:在高峰时段,RabbitMQ可以缓存大量的消息,从而避免系统崩溃,并在低峰时段处理这些消息,提高系统的稳定性。
- 消息分发:RabbitMQ可以将消息分发到多个消费者进行处理,从而提高系统的灵活性和处理能力。
了解rabbitmq的设计架构,对理解mq如何使用有很大的帮助。
一个非常重要的点,mq中的生产者从来不是直接将消息发送到队列中的,而是将消息发送到了mq的交换机中(上图中的exchange为交换机), 甚至生产者都不知道这条消息将被发送到哪个队列中。
交换机是个怎样的设计呢,他的一侧连接生产者,从生产者接收消息,另外一侧连接队列,将消息push进队列中,将消息push进一个队列,还是多个队列,还是抛弃,这些策略是由交换机的类型决定的,对于交换机的使用,后面详细介绍。
2、RabbitMQ安装
rabiitmq的安装,最简单的一种方式为运行mq的docker镜像,一行命令搞定:
# latest RabbitMQ 3.13
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management
执行命令后,可以看到如下打印,则代表RabbitMQ启动成功:
镜像启动成功后,可以通过ip:15672打开mq控制台:
http://xxx.xx.xxx.xx:15672/#/
mq安装完成后,下面就可以进行实践啦。
3、默认模式读、写mq
rabbitmq官方的库:github.com/rabbitmq/amqp091-go
生产者侧代码:
package mainimport ("context""fmt""time"amqp "github.com/rabbitmq/amqp091-go"
)func Send(msg string) error {// 连接rabbitmqconn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil {fmt.Println("connect error:", err)return err}defer conn.Close()// 创建通道ch, err := conn.Channel()if err != nil {fmt.Println("channel error:", err)return err}defer ch.Close()// 创建队列,使用默认的交换机q, err := ch.QueueDeclare("lp_default", // nametrue, // durablefalse, // delete when unusedfalse, // exclusivefalse, // noWaitnil, // arguments)if err != nil {fmt.Println("queue declare error:", err)return err}ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)defer cancel()fmt.Println(q.Name)// body := "Hello World!"err = ch.PublishWithContext(ctx,"", // exchange,默认交换机q.Name, // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body: []byte(msg),})if err != nil {fmt.Println("publish error:", err)return err}return nil
}func main() {Send("Hello world")
}
运行上面代码后,可以在rabbitmq的客户端 看到这个队列:
点击队列,进入队列详情:
第一个框中,显式了队列详情,可以看出,这个队列绑定的是默认的交换机。
第二个框,点击后,可以看到队列中的消息详情。
消费者:
package mainimport ("fmt"amqp "github.com/rabbitmq/amqp091-go"
)func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil {fmt.Println("connect error:", err)return}defer conn.Close()ch, err := conn.Channel()if err != nil {fmt.Println("Channel error:", err)return}defer ch.Close()q, err := ch.QueueDeclare("lp_default", // nametrue, // durablefalse, // delete when unusedfalse, // exclusivefalse, // no-waitnil, // arguments)if err != nil {fmt.Println("Queue Declare error:", err)return}msgs, err := ch.Consume(q.Name, // queue"", // consumertrue, // auto-ackfalse, // exclusivefalse, // no-localfalse, // no-waitnil, // args)if err != nil {fmt.Println("Consume error:", err)return}var forever chan struct{}go func() {for d := range msgs {fmt.Printf("Received a message: %s\n", d.Body)}}()fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C")<-forever
}
代码运行记录:
liupeng@192 default % go run recive.go[*] Waiting for messages. To exit press CTRL+CReceived a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
在以上消费端代码中,如果代码在处理消息的过程中出现异常导致了程序退出,这样正在处理的这条消息就会丢失,为了避免这种情况的发生,rabbitmq设计了消息应答的机制,我们修改上面程序,将auto-ack参数设置为false,当处理完消息后,使用d.Ack(false)发送消息应答。
msgs, err := ch.Consume(q.Name, // queue"", // consumerfalse, // auto-ack,设置为false,取消自动应答false, // exclusivefalse, // no-localfalse, // no-waitnil, // args)if err != nil {fmt.Println("Consume error:", err)return}var forever chan struct{}go func() {for d := range msgs {fmt.Printf("Received a message: %s\n", d.Body)d.Ack(false) // 手动应答}}()fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C")<-forever
如果忘记了进行消息应答,消息会被重新发入调度队列,这样就会吃掉越来越多的内存。
但是,当rabbitmq的服务down掉后,队列中的消息仍然会丢失,为了保证在这种情况下,消息仍然能够不丢失,我们需要做两件事:队列不丢失+消息不丢失,代码如下:
队列持久化:
q, err := ch.QueueDeclare("hello", // nametrue, // durable,设置队列持久化false, // delete when unusedfalse, // exclusivefalse, // no-waitnil, // arguments
)
failOnError(err, "Failed to declare a queue")
消息持久化:
将DeliveryMode设置为amqp.Persistent
err = ch.PublishWithContext(ctx,"", // exchange,默认交换机q.Name, // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: "text/plain",Body: []byte(msg),DeliveryMode: amqp.Persistent,})
以上就是默认读写rabbitmq的方法,后面再介绍其他几种使用方式。
相关文章:

RabbitMQ-默认读、写方式介绍
1、RabbitMQ简介 rabbitmq是一个开源的消息中间件,主要有以下用途,分别是: 应用解耦:通过使用RabbitMQ,不同的应用程序之间可以通过消息进行通信,从而降低应用程序之间的直接依赖性,提高系统的…...

阿里云百炼大模型使用
阿里云百炼大模型使用 由于阿里云百炼大模型有个新用户福利,有免费的4000000 tokens,我开通了相应的服务试试水。 使用 这里使用Android开发了一个简单的demo。 安装SDK implementation group: com.alibaba, name: dashscope-sdk-java, version: 2.…...
亲测有效,通过接口实现完美身份证号有效性验证+身份证与姓名匹配查询身份实名认证接口(实时)
最近发现一个限时认证的接口分享给大家,有需要的拿去试下吧. 附上部分密钥f478186edba9854f205a130aa888733d227a8f82f98d84b9【剩余约125450次,无时间限制】 b6131281611f6e1fc86c8662f549bdd683a68517203ba312【剩余约1300次,无时段限制】 …...

试题11 输出什么?
...

对vue3/core源码ref.ts文件API的认识过程
对toRef()API的认识的过程: 最开始认识toRef()是从vue3源码中的ref.ts看见的,右侧GPT已经举了例子 然后根据例子,在控制台输出ref对象是什么样子的: 这就是ref对象了,我们根据对象中有没有__v_isRef来判断是不是一个ref对象,当对象存在且__v_isRef true的时候他就判定为是一个…...
AWS迁移与传输之AWS DMS
AWS Database Migration Service(AWS DMS)是一项托管的服务,用于帮助企业将现有的数据库迁移到AWS云中的各种数据库引擎中,或者在不同数据库引擎之间进行数据迁移和同步。直接在线迁移,将数据复制到云端,不…...

【ML Olympiad】预测地震破坏——根据建筑物位置和施工情况预测地震对建筑物造成的破坏程度
文章目录 Overview 概述Goal 目标Evaluation 评估标准 Dataset Description 数据集说明Dataset Source 数据集来源Dataset Fields 数据集字段 Data Analysis and Visualization 数据分析与可视化Correlation 相关性Hierarchial Clustering 分层聚类Adversarial Validation 对抗…...
kafka监控配置和告警配置
Kafka的监控配置和告警配置是确保Kafka集群稳定运行的关键部分。以下是一些关于Kafka监控配置和告警配置的建议: 一、Kafka监控配置 集群级别参数监控: log.retention.hours:用于控制消息在日志中保留的时间。监控此参数的值,确…...

关于智慧校园安全用电监测系统的设计
人生人身安全是大家关注的话题,2019年12月中国消防统计近五年发生在全国学生宿舍的火灾2314起(中国消防2019.12.应急管理部消防救援局官方微博),违规电器是引发火灾的主因。如果在各寝室安装智能用电监测器实时监督线路参数&#…...
Flutter 中的 FormField 小部件:全面指南
Flutter 中的 FormField 小部件:全面指南 在Flutter的世界里,表单是用户输入数据的基本方式之一。FormField是一个强大的小部件,它将表单字段的创建、验证和管理集成到了一个易于使用的抽象中。本文将为您提供一个全面的指南,帮助…...

数据库DCL语句
数据库DCL语句 介绍: DCL英文全称是Data Control Language(数据控制语言),用来管理数据库用户、控制数据库的访 问权限。 管理用户: 查询用户: select * from mysql.user;创建用户: create user 用户名主机名 identified by 密码;修改用…...
mysql-日志管理-error.log
日志管理 默认的数据库日志 vim /etc/my.cnf //错误日志 log-error/usr/local/mysql/mysql.log查看数据库日志 tail -f /usr/local/mysql/mysql.log1 错误日志 :启动,停止,关闭失败报错。rpm安装日志位置 /var/log/mysqld.log #默认开启 2 …...

弱密码系统登录之后强制修改密码
在你登录的时候,获取到弱密码,然后将他存到vuex里面,在登录进去之后,index页面再去取,思路是这样的 一、vuex里面定义密码字段 我是直接在user.js里面写的 import { login, logout, getInfo } from /api/login impo…...
解释Python中的多线程和多进程编程
在Python中,多线程(Multithreading)和多进程(Multiprocessing)是两种常见的并发编程技术,用于同时执行多个任务。然而,由于Python的全局解释器锁(GIL,Global Interpreter…...

【LeetCode】【1】两数之和(1141字)
文章目录 [toc]题目描述样例输入输出与解释样例1样例2样例3 提示进阶Python实现哈希表 个人主页:丷从心 系列专栏:LeetCode 刷题指南:LeetCode刷题指南 题目描述 给定一个整数数组nums和一个整数目标值target,请在该数组中找出…...

【论文速读】|探索ChatGPT在软件安全应用中的局限性
本次分享论文:Exploring the Limits of ChatGPT in Software Security Applications 基本信息 原文作者:Fangzhou Wu, Qingzhao Zhang, Ati Priya Bajaj, Tiffany Bao, Ning Zhang, Ruoyu "Fish" Wang, Chaowei Xiao 作者单位:威…...

部门来了个测试开发,听说是00后,上来一顿操作给我看蒙了...
公司新来了个同事,听说大学是学的广告专业,因为喜欢IT行业就找了个培训班,后来在一家小公司实习半年,现在跳槽来我们公司。来了之后把现有项目的性能优化了一遍,服务器缩减一半,性能反而提升4倍!…...

小程序-修改用户头像
1、调用拍照 / 选择图片 // 修改头像 const onAvatarChange () > { // 调用拍照 / 选择图片 uni.chooseMedia({ // 文件个数 count: 1, // 文件类型 mediaType: [image], success: (res) > { console.log(res) // 本地临时文件路径 (本地路径) const { tempFilePath } …...

PCIe总线-事物层之TLP请求和完成报文格式介绍(六)
1.概述 TLP报文按照类型,可以大致分为4中类型,分别是IO请求报文、存储器请求报文、配置请求报文、完成报文和消息请求报文。IO请求报文可分为IO读请求(不携带数据)和IO写请求(携带数据)。存储器请求报文可…...

从 0 开始实现一个网页聊天室 (小型项目)
实现功能 用户注册和登录好友列表展示会话列表展示: 显示当前正在进行哪些会话 (单聊 / 群聊) , 选中好友列表中的某个好友, 会生成对应的会话实时通信, A给B发送消息, B的聊天界面 / 会话界面能立刻显示新的消息 TODO: 添加好友功能用户头像显示传输图片 / 表情包历史消息搜…...
MySQL 隔离级别:脏读、幻读及不可重复读的原理与示例
一、MySQL 隔离级别 MySQL 提供了四种隔离级别,用于控制事务之间的并发访问以及数据的可见性,不同隔离级别对脏读、幻读、不可重复读这几种并发数据问题有着不同的处理方式,具体如下: 隔离级别脏读不可重复读幻读性能特点及锁机制读未提交(READ UNCOMMITTED)允许出现允许…...

华为OD机试-食堂供餐-二分法
import java.util.Arrays; import java.util.Scanner;public class DemoTest3 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseint a in.nextIn…...
大学生职业发展与就业创业指导教学评价
这里是引用 作为软工2203/2204班的学生,我们非常感谢您在《大学生职业发展与就业创业指导》课程中的悉心教导。这门课程对我们即将面临实习和就业的工科学生来说至关重要,而您认真负责的教学态度,让课程的每一部分都充满了实用价值。 尤其让我…...

MySQL 知识小结(一)
一、my.cnf配置详解 我们知道安装MySQL有两种方式来安装咱们的MySQL数据库,分别是二进制安装编译数据库或者使用三方yum来进行安装,第三方yum的安装相对于二进制压缩包的安装更快捷,但是文件存放起来数据比较冗余,用二进制能够更好管理咱们M…...

MFC 抛体运动模拟:常见问题解决与界面美化
在 MFC 中开发抛体运动模拟程序时,我们常遇到 轨迹残留、无效刷新、视觉单调、物理逻辑瑕疵 等问题。本文将针对这些痛点,详细解析原因并提供解决方案,同时兼顾界面美化,让模拟效果更专业、更高效。 问题一:历史轨迹与小球残影残留 现象 小球运动后,历史位置的 “残影”…...

mac 安装homebrew (nvm 及git)
mac 安装nvm 及git 万恶之源 mac 安装这些东西离不开Xcode。及homebrew 一、先说安装git步骤 通用: 方法一:使用 Homebrew 安装 Git(推荐) 步骤如下:打开终端(Terminal.app) 1.安装 Homebrew…...
【Android】Android 开发 ADB 常用指令
查看当前连接的设备 adb devices 连接设备 adb connect 设备IP 断开已连接的设备 adb disconnect 设备IP 安装应用 adb install 安装包的路径 卸载应用 adb uninstall 应用包名 查看已安装的应用包名 adb shell pm list packages 查看已安装的第三方应用包名 adb shell pm list…...
关于uniapp展示PDF的解决方案
在 UniApp 的 H5 环境中使用 pdf-vue3 组件可以实现完整的 PDF 预览功能。以下是详细实现步骤和注意事项: 一、安装依赖 安装 pdf-vue3 和 PDF.js 核心库: npm install pdf-vue3 pdfjs-dist二、基本使用示例 <template><view class"con…...

TSN交换机正在重构工业网络,PROFINET和EtherCAT会被取代吗?
在工业自动化持续演进的今天,通信网络的角色正变得愈发关键。 2025年6月6日,为期三天的华南国际工业博览会在深圳国际会展中心(宝安)圆满落幕。作为国内工业通信领域的技术型企业,光路科技(Fiberroad&…...

《信号与系统》第 6 章 信号与系统的时域和频域特性
目录 6.0 引言 6.1 傅里叶变换的模和相位表示 6.2 线性时不变系统频率响应的模和相位表示 6.2.1 线性与非线性相位 6.2.2 群时延 6.2.3 对数模和相位图 6.3 理想频率选择性滤波器的时域特性 6.4 非理想滤波器的时域和频域特性讨论 6.5 一阶与二阶连续时间系统 6.5.1 …...