【RabbitMQ】golang客户端教程2——工作队列
任务队列/工作队列
在上一个教程中,我们编写程序从命名的队列发送和接收消息。在这一节中,我们将创建一个工作队列,该队列将用于在多个工人之间分配耗时的任务。
工作队列(又称任务队列)的主要思想是避免立即执行某些资源密集型任务并且不得不等待这些任务完成。相反,我们安排任务异步地同时或在当前任务之后完成。我们将任务封装为消息并将其发送到队列,在后台运行的工作进程将取出消息并最终执行任务。当你运行多个工作进程时,任务将在他们之间共享。
这个概念在Web应用中特别有用,因为在Web应用中不可能在较短的HTTP请求窗口内处理复杂的任务,(译注:例如注册时发送邮件或短信验证码等场景)。
准备工作
在本教程的上一部分,我们发送了一条包含“ Hello World!”的消息。现在,我们将发送代表复杂任务的字符串。我们没有实际的任务,例如调整图像大小或渲染pdf文件,所以我们通过借助time.Sleep
函数模拟一些比较耗时的任务。我们会将一些包含.
的字符串封装为消息发送到队列中,其中每有一个.
就表示需要耗费1秒钟的工作,例如,hello...
表示一个将花费三秒钟的假任务。
我们将稍微修改上一个示例中的send.go
代码,以允许从命令行发送任意消息。该程序会将任务安排到我们的工作队列中,因此我们将其命名为new_task.go
body := bodyFrom(os.Args) // 从参数中获取要发送的消息正文
err = ch.Publish("", // exchangeq.Name, // routing keyfalse, // mandatoryfalse,amqp.Publishing {DeliveryMode: amqp.Persistent,ContentType: "text/plain",Body: []byte(body),})
failOnError(err, "Failed to publish a message")
log.Printf(" [x] Sent %s", body)
下面是bodyFrom
函数:
func bodyFrom(args []string) string {var s stringif (len(args) < 2) || os.Args[1] == "" {s = "hello"} else {s = strings.Join(args[1:], " ")}return s
}
我们以前的receive.go
程序也需要进行一些更改:它需要为消息正文中出现的每个.
伪造一秒钟的工作。它将从队列中弹出消息并执行任务,因此我们将其称为worker.go
:
msgs, err := ch.Consume(q.Name, // queue"", // consumertrue, // auto-ackfalse, // exclusivefalse, // no-localfalse, // no-waitnil, // args
)
failOnError(err, "Failed to register a consumer")forever := make(chan bool)go func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)dot_count := bytes.Count(d.Body, []byte(".")) // 数一下有几个.t := time.Duration(dot_count)time.Sleep(t * time.Second) // 模拟耗时的任务log.Printf("Done")}
}()log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
请注意,我们的假任务模拟执行时间。
然后,我们就可以打开两个终端,分别执行new_task.go
和worker.go
了。
# shell 1
go run worker.go
# shell 2
go run new_task.go
循环调度
使用任务队列的优点之一是能够轻松并行化工作。如果我们的工作正在积压,我们可以增加更多的工人,这样就可以轻松扩展。
首先,让我们尝试同时运行两个worker.go
脚本。它们都将从队列中获取消息,但是究竟是怎样呢?让我们来看看。
你需要打开三个控制台。其中两个将运行worker.go
脚本。这些控制台将成为我们的两个消费者——C1和C2。
# shell 1
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
在第三个控制台中,我们将发布新任务。启动消费者之后,你可以发布一些消息:
# shell 3
go run new_task.go msg1.
go run new_task.go msg2..
go run new_task.go msg3...
go run new_task.go msg4....
go run new_task.go msg5.....
然后我们在shell1
和 shell2
两个窗口看到如下输出结果了:
# shell 1
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received a message: msg1.
# => [x] Received a message: msg3...
# => [x] Received a message: msg5.....
# shell 2
go run worker.go
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received a message: msg2..
# => [x] Received a message: msg4....
默认情况下,RabbitMQ将按顺序将每个消息发送给下一个消费者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为轮询。使用三个或者更多worker
试一下。
消息确认
work
完成任务可能需要耗费几秒钟,如果一个worker
在任务执行过程中宕机了该怎么办呢?我们当前的代码中,RabbitMQ一旦向消费者传递了一条消息,便立即将其标记为删除。在这种情况下,如果你终止一个worker
那么你就可能会丢失这个任务,我们还将丢失所有已经交付给这个worker
的尚未处理的消息。
我们不想丢失任何任务,如果一个worker
意外宕机了,那么我们希望将任务交付给其他worker
来处理。
为了确保消息永不丢失,RabbitMQ支持 消息确认。消费者发送回一个确认(acknowledgement)
,以告知RabbitMQ已经接收,处理了特定的消息,并且RabbitMQ可以自由删除它。
如果使用者在不发送确认的情况下死亡(其通道已关闭,连接已关闭或TCP连接丢失),RabbitMQ将了解消息未完全处理,并将对其重新排队。如果同时有其他消费者在线,它将很快将其重新分发给另一个消费者。这样,您可以确保即使工人偶尔死亡也不会丢失任何消息。
没有任何消息超时;RabbitMQ将在消费者死亡时重新传递消息。即使处理一条消息需要很长时间也没关系。
在本教程中,我们将使用手动消息确认,方法是为“auto-ack”
参数传递一个false,然后在完成任务后,使用d.Ack(false)
从worker
发送一个正确的确认(这将确认一次传递)。
msgs, err := ch.Consume(q.Name, // queue"", // consumerfalse, // 注意这里传false,关闭自动消息确认false, // exclusivefalse, // no-localfalse, // no-waitnil, // args
)
if err != nil {fmt.Printf("ch.Consume failed, err:%v\n", err)return
}// 开启循环不断地消费消息
forever := make(chan bool)
go func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)dotCount := bytes.Count(d.Body, []byte("."))t := time.Duration(dotCount)time.Sleep(t * time.Second)log.Printf("Done")d.Ack(false) // 手动传递消息确认}
}()
使用这段代码,我们可以确保即使你在处理消息时使用CTRL+C
杀死一个worker
,也不会丢失任何内容。在worker
死后不久,所有未确认的消息都将被重新发送。
消息确认必须在接收消息的同一通道(Channel)
上发送。尝试使用不同的通道(Channel)进行消息确认将导致通道级协议异常。
忘记确认
忘记确认是一个常见的错误。这是一个简单的错误,但后果是严重的。当你的客户机退出时,消息将被重新传递(这看起来像随机重新传递),但是RabbitMQ将消耗越来越多的内存,因为它无法释放任何未确认的消息。
为了调试这种错误,可以使用rabbitmqctl打印messages_unacknowledged字段:
> sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
> ```
>
> 在Windows平台,去掉sudo
>
> ```bash
> rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
> ```### 消息持久化我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是如果RabbitMQ服务器停止运行,我们的任务仍然会丢失。当RabbitMQ退出或崩溃时,它将忘记队列和消息,除非您告诉它不要这样做。要确保消息不会丢失,需要做两件事:我们需要将队列和消息都标记为持久的。首先,我们需要确保队列能够在RabbitMQ节点重新启动后继续运行。为此,我们需要声明它是持久的:```go
q, err := ch.QueueDeclare("hello", // nametrue, // 声明为持久队列false, // delete when unusedfalse, // exclusivefalse, // no-waitnil, // arguments
)
虽然这个命令本身是正确的,但它在我们当前的设置中不起作用。这是因为我们已经定义了一个名为hello
的队列,它不是持久的。RabbitMQ不允许你使用不同的参数重新定义现有队列,并将向任何尝试重新定义的程序返回错误。但是有一个快速的解决方法——让我们声明一个具有不同名称的队列,例如task_queue
:
q, err := ch.QueueDeclare("task_queue", // nametrue, // 声明为持久队列false, // delete when unusedfalse, // exclusivefalse, // no-waitnil, // arguments
)
这种持久的选项更改需要同时应用于生产者代码和消费者代码。
在这一点上,我们确信即使RabbitMQ重新启动,任务队列队列也不会丢失。现在我们需要将消息标记为持久的——通过使用amqp.Publishing
中的持久性选项amqp.Persistent
。
err = ch.Publish("", // exchangeq.Name, // routing keyfalse, // 立即false, // 强制amqp.Publishing{DeliveryMode: amqp.Persistent, // 持久(交付模式:瞬态/持久)ContentType: "text/plain",Body: []byte(body),})
有关消息持久性的说明
将消息标记为持久性并不能完全保证消息不会丢失。尽管它告诉RabbitMQ将消息保存到磁盘上,但是RabbitMQ接受了一条消息并且还没有保存它时,仍然有一个很短的时间窗口。而且,RabbitMQ并不是对每个消息都执行
fsync(2)
——它可能只是保存到缓存中,而不是真正写入磁盘。持久性保证不是很强,但是对于我们的简单任务队列来说已经足够了。如果您需要更强有力的担保,那么您可以使用publisher confirms
。
公平分发
你可能已经注意到调度仍然不能完全按照我们的要求工作。例如,在一个有两个worker
的情况下,当所有的奇数消息都是重消息而偶数消息都是轻消息时,一个worker
将持续忙碌,而另一个worker
几乎不做任何工作。嗯,RabbitMQ对此一无所知,仍然会均匀地发送消息。
这是因为RabbitMQ只是在消息进入队列时发送消息。它不考虑消费者未确认消息的数量。只是盲目地向消费者发送信息。
为了避免这种情况,我们可以将预取计数设置为1。这告诉RabbitMQ不要一次向一个worker
发出多个消息。或者,换句话说,在处理并确认前一条消息之前,不要向worker
发送新消息。相反,它将把它发送给下一个不忙的worker
。
err = ch.Qos(1, // prefetch count0, // prefetch sizefalse, // global
)
关于队列大小的说明
如果所有的worker都很忙,你的queue随时可能会满。你会想继续关注这一点,也许需要增加更多的worker,或者有一些其他的策略。
完整的代码实例
我们的new_task.go
的最终代码代入如下:
package mainimport ("github.com/streadway/amqp""log""os""strings"
)func failOnErrorNew(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}
func bodyFrom(args []string) string {var s stringif len(args) < 2 || os.Args[1] == "" {s = "hello"} else {s = strings.Join(args[1:], "")}return s
}func main() {//建立连接conn, err := amqp.Dial("amqp://licong:123456@8.130.85.112:5672/")failOnErrorNew(err, "Failed to connect to RabbitMQ")defer conn.Close()//获取channelch, err := conn.Channel()failOnErrorNew(err, "Failed to open a channel")defer ch.Close()q, err := ch.QueueDeclare("task_queue", //nametrue, //durablefalse, //delete when unusedfalse, //exclusivefalse, //no-waitnil, //arguments)failOnErrorNew(err, "Failed to declare a queue ")body := bodyFrom(os.Args)err = ch.Publish("", // exchangeq.Name, // routing keyfalse, // mandatoryfalse,amqp.Publishing{DeliveryMode: amqp.Persistent, //表示将消息设置为持久模式,确保在 RabbitMQ 重启后消息能够被恢复。ContentType: "text/plain",Body: []byte(body),})failOnErrorNew(err, "Failed to publish a message")log.Printf(" [x] Sent %s", body)}
work.go
内容如下:
package mainimport ("bytes""github.com/streadway/amqp""log""time"
)func failOnErrorWork(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}
func main() {//建立连接conn, err := amqp.Dial("amqp://licong:123456@8.130.85.112:5672/")failOnErrorWork(err, "Failed to connect to RabbitMQ")defer conn.Close()//获取channelch, err := conn.Channel()failOnErrorWork(err, "Failed to open a channel")defer ch.Close()//声明队列q, err := ch.QueueDeclare("task_queue", //nametrue, //durablefalse, //delete when unusedfalse, //exclusivefalse, //no-waitnil, //argument)failOnErrorWork(err, "Failed to declare a queue")err = ch.Qos(1, //prefetch count 消费者一次能获取的最大未确认消息数量0, //prefetch size 消费者一次能获取的最大未确认消息的总大小(以字节为单位)false, //global 是否将预取配置应用于所有通道。如果为 true,则应用于所有通道;如果为 false,则仅应用于当前通道。)//获取接收消息的Delivery通道msgs, err := ch.Consume(q.Name, //queue"", //consumerfalse, //注意这里传false,关闭自动消息确认false, //exclusivefalse, //no-localfalse, //no-waitnil, //args)failOnErrorWork(err, "Failed to register a consumer")forever := make(chan bool)go func() {for d := range msgs {log.Printf("Reveived a message:%s", d.Body)dot_count := bytes.Count(d.Body, []byte(".")) //数一下有几个t := time.Duration(dot_count)time.Sleep(t * time.Second)log.Printf("Done")d.Ack(false) //手动传递消息确认}}()log.Printf("[*] Waiting for messages. To exit press CTRL+C")<-forever
}
源自:https://www.rabbitmq.com/getstarted.html
相关文章:

【RabbitMQ】golang客户端教程2——工作队列
任务队列/工作队列 在上一个教程中,我们编写程序从命名的队列发送和接收消息。在这一节中,我们将创建一个工作队列,该队列将用于在多个工人之间分配耗时的任务。 工作队列(又称任务队列)的主要思想是避免立即执行某些…...
芯旺微冲刺IPO,车规级MCU竞争白热化下的“隐忧”凸显
在汽车智能化和电动化发展带来的巨大蓝海市场下,产业链企业迎来了一波IPO小高潮。 日前,上海芯旺微电子技术股份有限公司(以下简称“芯旺微”)在科创板的上市申请已经被上交所受理,拟募资17亿元,用于投建车…...

HTML <s> 标签
例子 可以像这样标记删除线文本: 在 HTML 5 中,<s>仍然支持</s>已经不支持这个标签了。 浏览器支持 元素ChromeIEFirefoxSafariOpera<s>YesYesYesYesYes 所有浏览器都支持 <s> 标签。 定义和用法 <s> 标签可定义加…...

微信小程序 - scroll-view组件之上拉加载下拉刷新(解决上拉加载不触发)
前言 最近在做微信小程序项目中,有一个功能就是做一个商品列表分页限流然后实现上拉加载下拉刷新功能,遇到了一个使用scroll-viwe组件下拉刷新事件始终不触发问题,网上很多说给scroll-view设置一个高度啥的就可以解决,有些人设置了…...

rust usize与i64怎么比较大小?
在Rust中, usize 和 i64 是不同的整数类型,它们的位数和表示范围可能不同。因此,直接比较 usize 和 i64 是不允许的。如果需要比较它们的大小,可以将它们转换为相同的类型,然后进行比较。 要将 usize 转换为 i64 &…...

电脑更新win10黑屏解决方法
电脑更新win10黑屏解决方法 电脑黑屏出现原因解决步骤 彻底解决 电脑黑屏 出现原因 系统未更新成功就关机,导致系统出故障无法关机 解决步骤 首先长安电源键10s关机 按电源键开机,出现logo时按F8进入安全模式。 进入自动修复环境后,单击…...

STM32入门——外部中断
中断系统概述 中断:在主程序运行过程中,出现了特定的中断触发条件(中断源),使得CPU暂停当前正在运行的程序,转而去处理中断程序,处理完成后又返回原来被暂停的位置继续运行中断优先级ÿ…...

【计算机网络】NAT及Bridge介绍
OSI七层模型 七层模型介绍及举例 为通过网络将人类可读信息通过网络从一台设备传输到另一台设备,必须在发送设备沿 OSI 模型的七层结构向下传输数据,然后在接收端沿七层结构向上传输数据。 数据在 OSI 模型中如何流动 库珀先生想给帕尔梅女士发一封电…...

封装动态SQL的插件
最近根据公司的业务需要封装了一个简单的动态SQL的插件,要求是允许用户在页面添加SQL的where条件,然后开发者只需要给某个接口写查询对应的表,参数全部由插件进行拼接完成。下面是最终实现: 开发人员只需要在接口写上下面的查询SQ…...

C# Microsoft消息队列服务器的使用 MSMQ
先安装消息队列服务器 private static readonly string path ".\\Private$\\myQueue";private void Create(){if (!MessageQueue.Exists(path)){MessageQueue.Create(path);}}private void Send(){Stopwatch stopwatch new Stopwatch();stopwatch.Start();Message…...

Kafka3.0.0版本——生产者如何提高吞吐量
目录 一、生产者提高吞吐量参数设置二、产者提高吞吐量代码示例 一、生产者提高吞吐量参数设置 batch.size:设置批次大小,默认16klinger.ms:设置等待时间,修改为5-100msbuffer.memory:设置缓冲区大小, 默认…...

js精度丢失的问题
1.js精度丢失的常见问题,从常见的浮点型进行计算,到位数很长的munber类型进行计算都会造成精度丢失的问题, 首先我们看一个问题: 0.1 0.2 ! 0.3 // truelet a 9007199254740992 a 1 a // true那么js为什么会出现精度丢失的问题&…...

C++ 编译预处理
在编译器对源程序进行编译时,首先要由处理器对程序文本进行预处理。预处理器提供了一组编译预处理指令和预处理操作符。预处理指令实际上不是C语言的一部分,它只是用来扩充C程序设计环境。所有的预处理指令在程序中都以“#”来引导,每一条预处…...

备战秋招 | 笔试强化22
目录 一、选择题 二、编程题 三、选择题题解 四、编程题题解 一、选择题 1、在有序双向链表中定位删除一个元素的平均时间复杂度为 A. O(1) B. O(N) C. O(logN) D. O(N*logN) 2、在一个以 h 为头指针的单循环链表中,p 指针指向链尾结点的条件是( ) A. p->ne…...

LeetCode ACM模式——哈希表篇(二)
刷题顺序及思路来源于代码随想录,网站地址:https://programmercarl.com 202. 快乐数 编写一个算法来判断一个数 n 是不是快乐数。 「快乐数」 定义为: 对于一个正整数,每一次将该数替换为它每个位置上的数字的平方和。然后重复…...

hadoop 3.1.3集群搭建 ubuntu20
相关 hyper-v安装ubuntu-20-server hyper-v建立快照 hyper-v快速创建虚拟机-导入导出虚拟机 准备 虚拟机设置 采用hyper-v方式安装ubuntu-20虚拟机和koolshare hostnameiph01192.168.66.20h02192.168.66.21h03192.168.66.22 静态IP 所有机器都需要按需设置 sudo vim /e…...

备忘录模式——撤销功能的实现
1、简介 1.1、概述 备忘录模式提供了一种状态恢复的实现机制,使得用户可以方便地回到一个特定的历史步骤。当新的状态无效或者存在问题时,可以使用暂时存储起来的备忘录将状态复原。当前很多软件都提供了撤销(Undo)操作…...

Golang 函数参数的传递方式 值传递,引用传递
基本介绍 我们在讲解函数注意事项和使用细节时,已经讲过值类型和引用类型了,这里我们再系统总结一下,因为这是重难点,值类型参数默认就是值传递,而引用类型参数默认就是引用传递。 两种传递方式(函数默认都…...

K8s影响Pod调度和Deployment
5.应用升级回滚和弹性伸缩...

透明代理和不透明代理
透明代理和不透明代理 1、透明代理(Transparent Proxy)2、不透明代理(Non-Transparent Proxy)3、工作原理4、透明代理为啥比不透明代理多一部先连接到路由再到代理服务器?5、这里路由器做了什么工作6、代理自动配置文件(Proxy Auto-Configuration file,PAC file)7、代理…...

1424. 对角线遍历 II;2369. 检查数组是否存在有效划分;1129. 颜色交替的最短路径
1424. 对角线遍历 II 核心思想:我感觉是一个技巧题,如果想到很容易做出了,想不到就很难了。首先对于一条对角线的数,其坐标ij是一样的,然后同一条对角线斜向上的j是从小到大的,知道这个就很容易做出来了。…...

【漏洞复现】Metabase 远程命令执行漏洞(CVE-2023-38646)
文章目录 前言声明一、漏洞介绍二、影响版本三、漏洞原理四、漏洞复现五、修复建议 前言 Metabase 0.46.6.1之前版本和Metabase Enterprise 1.46.6.1之前版本存在安全漏洞,未经身份认证的远程攻击者利用该漏洞可以在服务器上以运行 Metabase 服务器的权限执行任意命…...

Linux 9的repo for OVS build
源码中自带RPM包spec文件 cd /root/rpmbuild/SOURCES/openvswitch-2.17.7/rhel rpmbuild -bb openvswitch.spec ## 按提示解决,不好解决的依赖可以试试下面的repo 方法 error: File /root/rpmbuild/SOURCES/openvswitch-2.17.7.tar.gz: No such file or direct…...

DOCTYPE 是什么作用?
聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ DOCTYPE 是什么作用?⭐ 写在最后 ⭐ 专栏简介 前端入门之旅:探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何带你启航前端之旅 欢迎来到前端入门之旅!这个专栏是为那些对Web开发感兴…...

KubeSphere 3.4.0 发布:支持 K8s v1.26
2023 年 07 月 26 日,KubeSphere 开源社区激动地向大家宣布,KubeSphere 3.4.0 正式发布! 让我们先简单回顾下之前三个大版本的主要变化: KubeSphere 3.1.0 新增了“边缘计算”、“计量计费” 等功能,将 Kubernetes 从…...

自然语言文本分类模型代码
以下是一个基于PyTorch的文本分类模型的示例代码,用于将给定的文本分为多个预定义类别: import torch import torch.nn as nn import torch.nn.functional as Fclass TextClassifier(nn.Module):def __init__(self, vocab_size, embedding_dim, hidden_…...

Prometheus实现系统监控报警邮件
Prometheus实现系统监控报警邮件 简介 Prometheus将数据采集和报警分成了两个模块。报警规则配置在Prometheus Servers上, 然后发送报警信息到AlertManger,然后我们的AlertManager就来管理这些报警信息,聚合报警信息过后通过email、PagerDu…...

could not import go.etcd.io/etcd/clientv3-go
问题描述 今天在封装etcd的时候导包报错: could not import go.etcd.io/etcd/clientv3 (no required module provides package "go.etcd.io/etcd/clientv3") 问题解决: get:确保下载了client包 go get go.etcd.io/etcd/client tidy go mod tidy 本文由 mdnice 多平台…...

MySQL的行锁、表锁触发
MySQL的行锁、表锁触发 sql CREATE TABLE products ( product_id INT PRIMARY KEY, product_name VARCHAR(50), stock INT ); INSERT INTO products (product_id, product_name, stock) VALUES (1001, ‘商品A’, 50), (1002, ‘商品B’, 30), (1003, ‘商品C’, 20); 一、行锁…...

mysql-入门笔记-3
# ----------排序查询-------- # 语法 # select 字段列表 from 表名 order by 字段1 排序方式1 ,字段2 排序方式2 ; DESC 降序 ASC升序 # 1 根据年龄对公司的员工进行升序排序---默认升序-黄色提示代码冗余 select * from userTable order by age ASC ; # 2 根据入职时间,对员…...