RabbitMQ从原理到实战—基于Golang【万字详解】
文章目录
- 前言
- 一、MQ是什么?
- 优势
- 劣势
- 二、MQ的用途
- 1、应用解耦
- 2、异步加速
- 3、削峰填谷
- 4、消息分发
- 三、RabbitMQ是什么
- 1、AMQP 协议
- 2、RabbitMQ 包含的要素
- 3、RabbitMQ 基础架构
- 四、实战
- 1、Simple模式(即最简单的收发模式)
- 2、Work Queues 模型
- 3、Publish/Subscribe 模型
- 4、Routing 模型
- 5、Topics 模型
前言
最近秋招开始找工作,顺便回顾消息队列并且总结。
一、MQ是什么?
消息队列(Message Queue)是一种在应用程序之间传递消息的通信模式。它通过在发送者和接收者之间建立一个消息队列来实现异步通信和解耦。
在消息队列模式中,发送者(Producer)将消息发送到一个中间件(Message Broker)中的消息队列,而接收者(Consumer)则从该队列中接收和处理消息。这种方式使得发送者和接收者可以独立地进行处理,而无需直接交互,从而实现解耦。发送者和接收者只需要知道如何与消息队列进行通信,而不需要知道彼此的存在。
优势
1. 异步通信:发送者将消息放入队列后即可继续进行其他操作,无需等待接收者的响应。接收者可以在合适的时候从队列中获取消息进行处理,实现了异步通信模式。
2. 解耦:发送者和接收者之间通过消息队列进行通信,彼此之间不直接耦合。发送者只需将消息发送到队列中,而不需要知道消息是如何被处理的。接收者只需从队列中获取消息进行处理,而不需要知道消息的来源。
3. 可靠性传输:消息队列通常提供持久化机制,确保消息在发送和接收过程中不会丢失。即使接收者暂时不可用,消息也会在队列中等待,直到接收者准备好接收为止。
4. 扩展性:消息队列可以支持多个发送者和接收者,实现系统的扩展性和高并发处理能力。
5. 缓冲和削峰填谷:通过将消息缓存到队列中,可以平衡发送者和接收者之间的处理速度差异,从而避免系统过载。
消息队列在分布式系统、微服务架构、异步任务处理、事件驱动架构等场景中被广泛应用。一些常见的消息队列系统包括RabbitMQ、Apache Kafka、ActiveMQ、Amazon SQS等。它们提供了丰富的功能和配置选项,可以根据应用需求选择合适的消息队列实现。
劣势
系统可用性降低
系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?
系统复杂度提高
MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?
一致性问题
A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理失败。如何保证消息数据处理的一致性?
二、MQ的用途
四个用途
应用解耦:提高系统容错性和可维护性
异步提速:提升用户体验和系统吞吐量
削峰填谷:提高系统稳定性
消息分发:提高系统灵活性
1、应用解耦
应用解耦是指通过使用消息队列等中间件来降低应用程序之间的直接依赖性,从而实现独立开发、部署和升级的能力。通过解耦,每个应用程序可以通过消息队列发送和接收消息,而不需要了解其他应用程序的具体实现细节。通过应用解耦,可以实现系统的松耦合架构,提高系统的可维护性、扩展性和容错性。
没有使用MQ:
- 系统的耦合性越高,容错性就越低,可维护性就越低。
使用 MQ 使得应用间解耦,提升容错性和可维护性。
2、异步加速
异步提速是指通过将耗时的操作转化为异步执行,从而提高系统的响应速度和吞吐量。通过异步处理,应用程序可以在等待某个操作完成的同时继续执行其他任务,而不需要阻塞等待结果返回。
例如,当一个应用程序需要进行网络请求并等待响应时,如果采用同步方式,应用程序会被阻塞,直到响应返回才能继续执行其他任务。而通过异步方式,应用程序可以继续执行其他任务,不需要等待网络请求的结果返回。这样可以提高系统的响应速度,使用户获得更好的体验。
没有使用MQ:
一个下单操作耗时:20 + 300 + 300 + 300 = 920ms
用户点击完下单按钮后,需要等待920ms才能得到下单响应,太慢!
使用MQ:
用户点击完下单按钮后,只需等待25ms就能得到下单响应 (20 + 5 = 25ms)。
提升用户体验和系统吞吐量(单位时间内处理请求的数目)。不需要的等待完成
3、削峰填谷
削峰填谷是一种通过平衡系统负载,减轻峰值压力和填充低谷时的资源利用率的技术。它的目标是在系统负载波动较大的情况下,合理利用资源,确保系统的稳定性和高效性。
没有使用MQ:
使用MQ:
使用了 MQ 之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在 MQ中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做填谷。简单来说就是慢慢分发
使用MQ后,可以提高系统稳定性。
4、消息分发
消息分发是一种将消息从发送者传递到接收者的机制,它在异步系统和事件驱动架构中起着重要的作用。消息分发可以实现解耦和灵活性,允许不同组件或模块之间通过消息进行通信,从而实现系统的松耦合和可扩展性。
下面是消息分发的一些关键概念和示例:
发布者(Publisher):发布者是消息分发系统中的发送者,它负责生成并发布消息。发布者将消息发送到消息分发系统,而不需要知道消息的具体接收者。
订阅者(Subscriber):订阅者是消息分发系统中的接收者,它通过订阅特定的消息或消息类型来表明自己对消息的兴趣。当有匹配的消息到达时,消息分发系统会将消息传递给订阅者。
主题(Topic):主题是消息分发系统中用于分类和组织消息的标识符或名称。发布者可以将消息发布到特定的主题,而订阅者可以选择订阅感兴趣的主题。通过主题,可以实现消息的细粒度过滤和选择性订阅。
三、RabbitMQ是什么
RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求可能比较低了。
1、AMQP 协议
AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP规范发布。类比HTTP。
AMQP三层协议:
Module Layer:协议最高层,主要定义了一些客户端调用的命令,客户端可以用这些命令实现自己的业务逻辑。
Session Layer:中间层,主要负责客户端命会发送给服务器,再将服务端应答返回客户端,提供可靠性同步机制和错误处理。
TransportLayer:最底层,主要传输二进制数据流,提供帧的处理、信道服用、错误检测和数据表示等。
AMQP组件:
交换器(Exchange):消息代理服务器中用于把消息路由到队列的组件。
队列(queue):用来存储消息的数据结构,位于硬盘或内存中。
绑定(Binding):一套规则,告知交换器消息应该将消息投递给哪个队列。
2、RabbitMQ 包含的要素
生产者:消息队列创建者,发送消息到MQ
消费者:连接到RabbitMQ,订阅到队列上,消费消息,持续订阅和单条订阅
消息:包含有效载荷和标签,有效载荷指要传输的数据,标签描述了有效载荷,并且RabbitMQ用它来决定谁获得消息,消费者只能拿到有效载荷,并不知道生产者是谁
3、RabbitMQ 基础架构
Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost创建 exchange/queue 等
Connection:publisher/consumer 和 broker 之间的 TCP 连接
Channel:Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销。是生产者、消费者与RabbitMQ通信的渠道,生产者publish或是消费者subscribe 一个队列都是通过信道来通信的。
信道是建立在TCP连接上的虚拟连接,就是说RabbitMQ在一条TCP上建立成百上千个信道来达到多个线程处理,这个TCP被多个线程共享,每个线程对应一个信道,信道在RabbitMQ都有一个唯一的ID,保证了信道私有性,对应上唯一的线程使用。
Exchange交换机:message 到达 broker 的第一站**,根据分发规则,匹配查询表中的 routing key,分发消息到queue中去。生产者将消息发送到交换器,有交换器将消息路由到一个或者多个队中。当路由不到时,或返回给生产者或直接丟弃。
Queue:消息最终被送到这里等待 consumer 取走
Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding信息被保存到 exchange 中的查询表中,用于 message 的分发依据
四、实战
RabbitMQ 提供了 6 种工作模式:简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算 MQ;暂不作介绍)。
1、Simple模式(即最简单的收发模式)
消息的消费者监听消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失,这里可以设置成手动的ack,但如果设置成手动ack,处理完后要及时发送ack消息给队列,否则会造成内存溢出)。
消费者:
package mainimport ("log""github.com/streadway/amqp"
)func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil {log.Fatalf("无法连接到RabbitMQ服务器:%s", err)}defer conn.Close()// 创建一个通道ch, err := conn.Channel()if err != nil {log.Fatalf("无法创建通道:%s", err)}defer ch.Close()// 声明一个队列queue, err := ch.QueueDeclare("hello", // 队列名false, // 持久性false, // 自动删除false, // 独占false, // 等待服务器确认nil, // 参数)if err != nil {log.Fatalf("无法声明队列:%s", err)}// 消费消息msgs, err := ch.Consume(queue.Name, // 队列名"", // 消费者标签true, // 自动确认false, // 独占false, // 不等待服务器确认false, // 参数)if err != nil {log.Fatalf("无法注册消费者:%s", err)}// 处理接收到的消息for msg := range msgs {log.Printf("接收到消息:%s", msg.Body)}
}
上述代码首先建立了与RabbitMQ服务器的连接,然后创建了一个通道和一个名为"heo"的队列。接下来,通过ch.Consume函数注册一个消费者,用于从队列中接收消息。在fo循环中,我们处理接收到的消息,这里只是简单地打印出来。
生产者:
package mainimport ("log""github.com/streadway/amqp"
)func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil {log.Fatalf("无法连接到RabbitMQ服务器:%s", err)}defer conn.Close()// 创建一个通道ch, err := conn.Channel()if err != nil {log.Fatalf("无法创建通道:%s", err)}defer ch.Close()// 声明一个队列queue, err := ch.QueueDeclare("hello", // 队列名false, // 持久性false, // 自动删除false, // 独占false, // 等待服务器确认nil, // 参数)if err != nil {log.Fatalf("无法声明队列:%s", err)}// 发送消息body := "Hello, RabbitMQ!"err = ch.Publish("", // 交换机queue.Name, // 队列名false, // 必须发送到队列false, // 不等待服务器确认amqp.Publishing{ContentType: "text/plain",Body: []byte(body),},)if err != nil {log.Fatalf("无法发送消息:%s", err)}log.Printf("消息已发送:%s", body)
}
上述代码与消费者程序类似,首先建立了与RabbitMQ服务器的连接,然后创建了一个通道和一个名为"hello"的队列。接下来,通过ch.Publishi函数向队列发送一条消息。
2、Work Queues 模型
消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2同时监听同一个队列,消息被消费。C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患:高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关[syncronize]保证一条消息只能被一个消费者使用)。
让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
消费者:
package mainimport ("fmt""log""math/rand""time""github.com/streadway/amqp"
)func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil {log.Fatalf("无法连接到RabbitMQ服务器:%s", err)}defer conn.Close()// 创建一个通道ch, err := conn.Channel()if err != nil {log.Fatalf("无法创建通道:%s", err)}defer ch.Close()// 启动多个消费者并行处理任务for i := 1; i <= 3; i++ {go startConsumer(i, ch)}// 阻塞主进程select {}
}func generateTask(id int) string {time.Sleep(time.Duration(rand.Intn(3)) * time.Second)return fmt.Sprintf("Task %d", id)
}func startConsumer(id int, ch *amqp.Channel) {// 声明一个队列queue, err := ch.QueueDeclare("tasks_queue", // 队列名true, // 持久性false, // 自动删除false, // 独占false, // 等待服务器确认nil, // 参数)if err != nil {log.Fatalf("无法声明队列:%s", err)}// 消费任务msgs, err := ch.Consume(queue.Name, // 队列名"", // 消费者标签false, // 手动确认false, // 不等待服务器确认false, // 不使用内置的参数false, // 参数nil, // 参数)if err != nil {log.Fatalf("无法注册消费者:%s", err)}for msg := range msgs {task := string(msg.Body)log.Printf("消费者 %d 接收到任务:%s", id, task)log.Printf("消费者 %d 完成任务:%s", id, task)// 手动确认任务已处理msg.Ack(false)}
}
利用协城启动多个消费者进行消费。
结果如下:
3、Publish/Subscribe 模型
每个消费者监听自己的队列。
生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。
在RabbitMQ的Publish/Subscribe模型中,生产者将消息发送到交换机,交换机负责将消息广播给所有绑定到它上面的队列。消费者创建队列并将其绑定到交换机上,从而接收交换机发送的消息。这样,一个消息可以被多个消费者接收。
在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:
P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
C:消费者,消息的接收者,会一直等待消息到来
Queue:消息队列,接收消息、缓存消息
Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key 的队列
Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!
package mainimport ("log""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()// 创建一个通道ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()// 声明一个交换机err = ch.ExchangeDeclare("logs", // 交换机名称"fanout", // 交换机类型true, // 是否持久化false, // 是否自动删除false, // 是否内部使用false, // 是否等待服务器响应nil, // 其他属性)failOnError(err, "Failed to declare an exchange")// 发布消息到交换机body := "Hello, RabbitMQ!"err = ch.Publish("logs", // 交换机名称"", // 路由键,留空表示广播给所有队列false, // 是否等待服务器响应false, // 其他属性amqp.Publishing{ContentType: "text/plain",Body: []byte(body),},)failOnError(err, "Failed to publish a message")log.Printf("Message sent: %s", body)
}
连接到RabbitMQ服务器,声明了一个名为"logs"的交换机,并通过调用ch.Publish方法将消息发布到交换机上。
在示例代码中,通过指定交换机名称为"logs",路由键为空字符串,消息将被广播给所有绑定到该交换机的队列。
package mainimport ("fmt""log""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()// 创建一个通道ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()// 声明一个交换机err = ch.ExchangeDeclare("logs", // 交换机名称"fanout", // 交换机类型true, // 是否持久化false, // 是否自动删除false, // 是否内部使用false, // 是否等待服务器响应nil, // 其他属性)failOnError(err, "Failed to declare an exchange")// 声明一个临时队列q, err := ch.QueueDeclare("", // 队列名称,留空表示由RabbitMQ自动生成false, // 是否持久化false, // 是否自动删除(当没有任何消费者连接时)true, // 是否排他队列(仅限于当前连接)false, // 是否等待服务器响应nil, // 其他属性)failOnError(err, "Failed to declare a queue")// 将队列绑定到交换机上err = ch.QueueBind(q.Name, // 队列名称"", // 路由键,留空表示接收交换机的所有消息"logs", // 交换机名称false, // 是否等待服务器响应nil, // 其他属性)failOnError(err, "Failed to bind a queue")// 订阅消息msgs, err := ch.Consume(q.Name, // 队列名称"", // 消费者标识符,留空表示由RabbitMQ自动生成true, // 是否自动应答false, // 是否独占模式(仅限于当前连接)false, // 是否等待服务器响应false, // 其他属性nil, // 其他属性)failOnError(err, "Failed to register a consumer")// 接收消息的goroutinego func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)}}()log.Printf("Waiting for messages. To exit press CTRL+C")<-make(chan struct{}) // 阻塞主goroutine
}
它连接到RabbitMQ服务器,声明一个fanout类型的交换机(Exchange),创建一个临时队列,将队列绑定到交换机上,并订阅消息。
在示例代码中,创建的交换机名为"logs",交换机类型为"fanout",表示消息将被广播给所有绑定到该交换机的队列。
消费者创建了一个临时队列,并将其绑定到交换机上,这样交换机就会将消息发送到该队列中。
4、Routing 模型
在fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
1、队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
2、消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
3、Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息
消息生产者将消息发送给交换机按照路由判断,路由是字符串(info)当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息。
生产者
package mainimport ("log""os""strings""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()// 创建一个通道ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()// 声明一个交换机err = ch.ExchangeDeclare("logs_direct", // 交换机名称"direct", // 交换机类型true, // 是否持久化false, // 是否自动删除false, // 是否内部使用false, // 是否等待服务器响应nil, // 其他属性)failOnError(err, "Failed to declare an exchange")// 从命令行参数获取要发送的路由键和消息内容if len(os.Args) < 3 {log.Fatalf("Usage: %s [info] [message]", os.Args[0])}severity := os.Args[1]message := strings.Join(os.Args[2:], " ")// 发布消息到交换机,并指定路由键err = ch.Publish("logs_direct", // 交换机名称severity, // 路由键false, // 是否等待服务器响应false, // 是否立即将消息写入磁盘amqp.Publishing{ContentType: "text/plain",Body: []byte(message),},)failOnError(err, "Failed to publish a message")log.Printf("Sent message: %s", message)
}
它连接到RabbitMQ服务器,声明一个direct类型的交换机(Exchange),并通过指定路由键将消息发布到交换机。
在示例代码中,创建的交换机名为"logs_direct",交换机类型为"direct",表示消息将根据指定的路由键进行选择性地发送给队列。
生产者从命令行参数获取要发送的路由键和消息内容。路由键可以是任意字符串,用于标识消息的类型或者级别。消息内容可以是任意文本。
消费者
package mainimport ("fmt""log""os""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()// 创建一个通道ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()// 声明一个交换机err = ch.ExchangeDeclare("logs_direct", // 交换机名称"direct", // 交换机类型true, // 是否持久化false, // 是否自动删除false, // 是否内部使用false, // 是否等待服务器响应nil, // 其他属性)failOnError(err, "Failed to declare an exchange")// 声明一个临时队列q, err := ch.QueueDeclare("", // 队列名称,留空表示由RabbitMQ自动生成false, // 是否持久化false, // 是否自动删除(当没有任何消费者连接时)true, // 是否排他队列(仅限于当前连接)false, // 是否等待服务器响应nil, // 其他属性)failOnError(err, "Failed to declare a queue")// 从命令行参数获取要绑定的路由键if len(os.Args) < 2 {log.Fatalf("Usage: %s [info] [warning] [error]", os.Args[0])}severities := os.Args[1:]// 将队列绑定到交换机上,并指定要接收的路由键for _, severity := range severities {err = ch.QueueBind(q.Name, // 队列名称severity, // 路由键"logs_direct", // 交换机名称false, // 是否等待服务器响应nil, // 其他属性)failOnError(err, "Failed to bind a queue")}// 订阅消息msgs, err := ch.Consume(q.Name, // 队列名称"", // 消费者标识符,留空表示由RabbitMQ自动生成true, // 是否自动应答false, // 是否独占模式(仅限于当前连接)false, // 是否等待服务器响应false, // 其他属性nil, // 其他属性)failOnError(err, "Failed to register a consumer")// 接收消息的goroutinego func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)}}()log.Printf("Waiting for messages. To exit press CTRL+C")<-make(chan struct{}) // 阻塞主goroutine
}
上述代码实现了一个Routing模型的消费者。它连接到RabbitMQ服务器,声明一个direct类型的交换机(Exchange),创建一个临时队列,并将队列绑定到交换机上,同时指定要接收的路由键。
在RabbitMQ的Routing模型中,生产者将消息发送到交换机,并在发送消息时指定一个路由键(routing key)。交换机根据路由键将消息发送给与之绑定的队列。消费者创建队列并将其绑定到交换机上,并通过指定要接收的路由键来选择性地接收消息。
在示例代码中,创建的交换机名为"logs_direct",交换机类型为"direct",表示消息将根据指定的路由键进行选择性地发送给队列。
消费者创建了一个临时队列,并通过循环将该队列绑定到交换机上,并指定要接收的路由键。路由键可以是任意字符串,用于标识消息的类型或者级别。在示例中,我们通过命令行参数传入要绑定的路由键。
最后,消费者通过调用ch.Consume方法订阅消息。该方法返回一个消息通道msgs,消费者可以从该通道接收到消息。在示例中,我们使用一个goroutine来异步接收消息,并在收到消息时打印出来。
5、Topics 模型
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!
这种模型Routingkey 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
统配符
* 匹配不多不少恰好1个词
# 匹配一个或多个词
如:
fan.# 匹配 fan.one.two 或者 fan.one 等
fan.* 只能匹配 fan.one
生产者
func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()// 创建一个通道ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()// 声明一个交换机err = ch.ExchangeDeclare("logs_topic", // 交换机名称"topic", // 交换机类型true, // 是否持久化false, // 是否自动删除false, // 是否内部使用false, // 是否等待服务器响应nil, // 其他属性)failOnError(err, "Failed to declare an exchange")// 定义要发送的消息的路由键和内容routingKey := "example.key.das"message := "Hello, RabbitMQ!"// 发布消息到交换机,并指定路由键err = ch.Publish("logs_topic", // 交换机名称routingKey, // 路由键false, // 是否等待服务器响应false, // 是否立即发送amqp.Publishing{ContentType: "text/plain",Body: []byte(message),},)failOnError(err, "Failed to publish a message")log.Printf("Sent message: %s", message)
}
消费者
func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()// 创建一个通道ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()// 声明一个交换机err = ch.ExchangeDeclare("logs_topic", // 交换机名称"topic", // 交换机类型true, // 是否持久化false, // 是否自动删除false, // 是否内部使用false, // 是否等待服务器响应nil, // 其他属性)failOnError(err, "Failed to declare an exchange")// 声明一个临时队列q, err := ch.QueueDeclare("", // 队列名称,留空表示由RabbitMQ自动生成false, // 是否持久化false, // 是否自动删除(当没有任何消费者连接时)true, // 是否排他队列(仅限于当前连接)false, // 是否等待服务器响应nil, // 其他属性)failOnError(err, "Failed to declare a queue")// 将队列绑定到交换机上,并指定要接收的路由键err = ch.QueueBind(q.Name, // 队列名称"example.#", // 路由键,可以使用通配符*匹配多个单词"logs_topic", // 交换机名称false, // 是否等待服务器响应nil, // 其他属性)failOnError(err, "Failed to bind a queue")// 创建一个消费者通道msgs, err := ch.Consume(q.Name, // 队列名称"", // 消费者标识符,留空表示由RabbitMQ自动生成true, // 是否自动应答false, // 是否排他消费者false, // 是否阻塞false, // 是否等待服务器响应nil, // 其他属性)failOnError(err, "Failed to register a consumer")// 接收和处理消息forever := make(chan bool)go func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)}}()log.Printf("Waiting for messages...")<-forever
}
相关文章:

RabbitMQ从原理到实战—基于Golang【万字详解】
文章目录 前言一、MQ是什么?优势劣势 二、MQ的用途1、应用解耦2、异步加速3、削峰填谷4、消息分发 三、RabbitMQ是什么1、AMQP 协议2、RabbitMQ 包含的要素3、RabbitMQ 基础架构 四、实战1、Simple模式(即最简单的收发模式)2、Work Queues 模型3、Publish/Subscribe…...

机器学习——KNN算法
1、:前提知识 KNN算法是机器学习算法中用于分类或者回归的算法,KNN全称为K nearest neighbour(又称为K-近邻算法) 原理:K-近邻算法采用测量不同特征值之间的距离的方法进行分类。 优点:精度高 缺点&…...

Kali 软件管理测试案例
案例1 :显示目录树 tree ┌──(root㉿kali)-[~] └─# tree --help usage: tree [-acdfghilnpqrstuvxACDFJQNSUX] [-L level [-R]] [-H baseHREF][-T title] [-o filename] [-P pattern] [-I pattern] [--gitignore][--gitfile[]file] [--matchdirs] [--metafirs…...
【分布式】Zookeeper
Java开发者视角下的Zookeeper—— 在什么场景下使用,怎么用 可以参考:https://zhuanlan.zhihu.com/p/62526102 Zookeeper是什么? ZooKeeper 是一个分布式的,开放源码的分布式应用程序协同服务。ZooKeeper 的设计目标是将那些复…...

ScheduleJS Crack,新的“信息列”水平滚动功能
ScheduleJS Crack,新的“信息列”水平滚动功能 增加了对Angular 16的支持 新的“信息列”水平滚动功能。 新的“信息列”固定功能。 添加了输入属性以处理组件模板中的偶数和奇数ScheduleRowPlainBackgroundColor以及CSS变量。 改进了“信息列”和角度甘特组件的类型。 Schedul…...
curl封装
一。由于工作的原因,需要对curl做一些封装,附加上我们的证书,提供给第三个C和jAVA使用。 二。头文件封闭四个函数,get,post,download,upload #ifndef CURLHTTP_H #define CURLHTTP_H#include …...
C语言数据类型和变量
C语言数据类型和变量 数据类型分类内置类型【C语言本身就具有的类型】自定义类型【自己来创建类型】取值范围 变量变量的创建变量创建的语法形式变量的分类全局变量局部变量 栈区、堆区、静态区 算术操作符赋值操作符连续赋值复合赋值符 单目操作符:、--、、-强制类…...

分布式训练 最小化部署docker swarm + docker-compose落地方案
目录 背景: 前提条件: 一、docker环境初始化配置 1. 安装nvidia-docker2 2. 安装docker-compose工具 3. 获取GPU UUID 4. 修改docker runtime为nvidia,指定机器的UUID 二、docker-swarm 环境安装 1. 初始化swarm管理节点 2. 加入工…...

QT学习笔记-开发环境编译Qt MySql数据库驱动与交叉编译Qt MySql数据库驱动
QT学习笔记-开发环境编译Qt MySql数据库驱动与交叉编译Qt MySql数据库驱动 0、背景1、基本环境2、开发环境编译Qt MySql数据库驱动2.1 依赖说明2.2 MySQL驱动编译过程 3、交叉编译Qt MySql数据库驱动3.1 依赖说明3.3.1 如何在交叉编译服务器上找到mysql.h及相关头文件3.3.2 如果…...
QT使用QXlsx实现数据验证与Excel公式操作 QT基础入门【Excel的操作】
准备环境:QT中使用QtXlsx库的三种方法 1、公式操作写单行公式 //右值初始化Format rAlign;rAlign.setHorizontalAlignment(Format::AlignRight);//左值初始化Format lAlign;lAlign.setHorizontalAlignment(Format::AlignLeft);xlsx.write("B3", 40, lAlign);xlsx.wr…...
renrenfast Vue2 打包发布
1、修改 static/config/index-prod.js 文件 // api接口请求地址 window.SITE_CONFIG[baseUrl] http://192.168.1.86:8080/renren-fast; /*** 生产环境*/ ;(function () {window.SITE_CONFIG {};// api接口请求地址window.SITE_CONFIG[baseUrl] http://192.16…...

NoSQL数据库介绍+Redis部署
目录 一、NoSQL概述 1、数据的高并发读写 2、海量数据的高效率存储和访问 3、数据库的高扩展和高可用 二、NoSQL的类别 1、键值存储数据库 2、列存储数据库 3、文档型数据库 4、图形化数据库 三、分布式数据库中的CAP原理 1、传统的ACID 1)、A--原子性 …...

【mindspore学习】环境配置
本次实验搭配的环境是 CUDA 11.6 CUDNN v8.9.4 TensorRT-8.4.1.5 mindspore 2.1.0。 1、配置 Nvidia 显卡驱动 如果原来的主机已经安装了 nvidia 驱动,为避免版本的冲突,建议先清除掉旧的 nvidia驱动 sudo apt-get --purge remove nvidia* sudo apt…...
基于shell脚本对aliyun npm仓库(https://packages.aliyun.com)登录认证
文章目录 基于shell脚本对阿里云npm仓库(https://packages.aliyun.com)登录认证食用人群食用方式 基于shell脚本对阿里云npm仓库(https://packages.aliyun.com)登录认证 食用人群 由于一些安全的原因,某些企业可能会…...
K8s Pod 安全认知:从openshift SCC 到 PSP 弃用以及现在的 PSA
写在前面 简单整理,博文内容涉及: PSP 的由来PSA 的发展PSA 使用认知不涉及使用,用于了解 Pod 安全 API 资源理解不足小伙伴帮忙指正对每个人而言,真正的职责只有一个:找到自我。然后在心中坚守其一生,全心全意,永不停息。所有其它的路都是不完整的,是人的逃避方式,是…...

提高企业会计效率,选择Manager for Mac(企业会计软件)
作为一家企业,良好的财务管理是保持业务运转的关键。而选择一款适合自己企业的会计软件,能够帮助提高会计效率、减少错误和节约时间。在众多的选择中,Manager for Mac(企业会计软件)是一款值得考虑的优秀软件。 首先,Manager for…...

软考:中级软件设计师:信息系统的安全属性,对称加密和非对称加密,信息摘要,数字签名技术,数字信封与PGP
软考:中级软件设计师:信息系统的安全属性 提示:系列被面试官问的问题,我自己当时不会,所以下来自己复盘一下,认真学习和总结,以应对未来更多的可能性 关于互联网大厂的笔试面试,都是需要细心准…...
Vue3中reactive响应式失效的问题
情景阐述 弹窗内部有一个挑选框,要通过请求接口获取挑选框下面可供选择的数据。 这是一个很简单的情境,我立刻有了自己的思路。如果实现搜索,数据较少可以直接用elementplus自带的filter。如果数据较多,就需要传val,…...
lamp
LAMP 环境 指的是在 Linux 操作系统中分别安装 Apache 网页服务器、MySQL 数据库服务器和 PHP 开发服务器,以及一些对应的扩展软件。AMP也支持win操作系统 (sccm 域升级版) LAMP架构是目前成熟的企业网站应用模式之一,指的是协同…...

LeetCode 周赛上分之旅 #42 当 LeetCode 考树上倍增,出题的趋势在变化吗
⭐️ 本文已收录到 AndroidFamily,技术和职场问题,请关注公众号 [彭旭锐] 和 BaguTree Pro 知识星球提问。 学习数据结构与算法的关键在于掌握问题背后的算法思维框架,你的思考越抽象,它能覆盖的问题域就越广,理解难度…...
在鸿蒙HarmonyOS 5中实现抖音风格的点赞功能
下面我将详细介绍如何使用HarmonyOS SDK在HarmonyOS 5中实现类似抖音的点赞功能,包括动画效果、数据同步和交互优化。 1. 基础点赞功能实现 1.1 创建数据模型 // VideoModel.ets export class VideoModel {id: string "";title: string ""…...
线程同步:确保多线程程序的安全与高效!
全文目录: 开篇语前序前言第一部分:线程同步的概念与问题1.1 线程同步的概念1.2 线程同步的问题1.3 线程同步的解决方案 第二部分:synchronized关键字的使用2.1 使用 synchronized修饰方法2.2 使用 synchronized修饰代码块 第三部分ÿ…...
【SpringBoot】100、SpringBoot中使用自定义注解+AOP实现参数自动解密
在实际项目中,用户注册、登录、修改密码等操作,都涉及到参数传输安全问题。所以我们需要在前端对账户、密码等敏感信息加密传输,在后端接收到数据后能自动解密。 1、引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId...

SCAU期末笔记 - 数据分析与数据挖掘题库解析
这门怎么题库答案不全啊日 来简单学一下子来 一、选择题(可多选) 将原始数据进行集成、变换、维度规约、数值规约是在以下哪个步骤的任务?(C) A. 频繁模式挖掘 B.分类和预测 C.数据预处理 D.数据流挖掘 A. 频繁模式挖掘:专注于发现数据中…...

(二)TensorRT-LLM | 模型导出(v0.20.0rc3)
0. 概述 上一节 对安装和使用有个基本介绍。根据这个 issue 的描述,后续 TensorRT-LLM 团队可能更专注于更新和维护 pytorch backend。但 tensorrt backend 作为先前一直开发的工作,其中包含了大量可以学习的地方。本文主要看看它导出模型的部分&#x…...
2023赣州旅游投资集团
单选题 1.“不登高山,不知天之高也;不临深溪,不知地之厚也。”这句话说明_____。 A、人的意识具有创造性 B、人的认识是独立于实践之外的 C、实践在认识过程中具有决定作用 D、人的一切知识都是从直接经验中获得的 参考答案: C 本题解…...

【Linux】Linux安装并配置RabbitMQ
目录 1. 安装 Erlang 2. 安装 RabbitMQ 2.1.添加 RabbitMQ 仓库 2.2.安装 RabbitMQ 3.配置 3.1.启动和管理服务 4. 访问管理界面 5.安装问题 6.修改密码 7.修改端口 7.1.找到文件 7.2.修改文件 1. 安装 Erlang 由于 RabbitMQ 是用 Erlang 编写的,需要先安…...

快速排序算法改进:随机快排-荷兰国旗划分详解
随机快速排序-荷兰国旗划分算法详解 一、基础知识回顾1.1 快速排序简介1.2 荷兰国旗问题 二、随机快排 - 荷兰国旗划分原理2.1 随机化枢轴选择2.2 荷兰国旗划分过程2.3 结合随机快排与荷兰国旗划分 三、代码实现3.1 Python实现3.2 Java实现3.3 C实现 四、性能分析4.1 时间复杂度…...

基于开源AI智能名片链动2 + 1模式S2B2C商城小程序的沉浸式体验营销研究
摘要:在消费市场竞争日益激烈的当下,传统体验营销方式存在诸多局限。本文聚焦开源AI智能名片链动2 1模式S2B2C商城小程序,探讨其在沉浸式体验营销中的应用。通过对比传统品鉴、工厂参观等初级体验方式,分析沉浸式体验的优势与价值…...
LangChain【6】之输出解析器:结构化LLM响应的关键工具
文章目录 一 LangChain输出解析器概述1.1 什么是输出解析器?1.2 主要功能与工作原理1.3 常用解析器类型 二 主要输出解析器类型2.1 Pydantic/Json输出解析器2.2 结构化输出解析器2.3 列表解析器2.4 日期解析器2.5 Json输出解析器2.6 xml输出解析器 三 高级使用技巧3…...