当前位置: 首页 > article >正文

消息队列RabbitMQ的配置操作及使用

一、RabbitMQ的体系结构RabbitMQ是一个基于AMQPAdvanced Message Queuing Protocol高级消息队列协议实现的开源消息中间件主要用于在分布式系统中存储和转发消息。它由Erlang语言编写以高性能、高可用性以及高扩展性而著称。官网地址RabbitMQ: One broker to queue them all | RabbitMQ1.1 基本架构设计①生产者Producer消息的发送方负责产生并发送消息到 rabbitMQ 服务端②交换机Exchange消息的分发中心负责将接收到的消息路由到一个或多个队列直连交换机Direct Exchange将消息路由到与消息中的路由键完全匹配的队列主题交换机Topic Exchange根据通配符匹配路由键将消息路由到相应队列扇出交换机Fanout Exchange将消息广播到与交换机绑定的所有队列头部交换机Headers Exchange根据消息头中的属性匹配到相应队列③队列消息的存储区用于存储生产者发送的消息④消费者消息的接收方负责从队列中获取消息并进行处理⑤绑定交换机和队列之间的关联关系。生产者将消息发送给交换机队列通过绑定交换机从 而接收消息⑥虚拟主机RabbitMQ 的基本工作单元每个虚拟主机拥有独立的用户、队列、交换机等资源⑦连接连接是指生产者、消费者与 RabbitMQ 之间的网络连接。每个连接可以包含多个信道 (Channel)每个信道是一个独立的会话通道可以进行独立的消息传递1.2 消息中间件的作用解耦降低应用程序之间的直接依赖性从而实现独立开发、部署和升级的能力异步可以将长时间的处理任务放入消息队列中异步处理从而提升响应速度削峰通过平衡系统负载来减轻峰值压力和填充低谷时的资源利用率1.3 常用消息队列对比二、基于 docker 安装 RabbitMQ2.1 拉取镜像docker pull rabbitmq // 如果需要包含管理插件的镜像可以拉取带有-management标签的镜像 docker pull rabbitmq:3.8-management2.2 运行镜像docker run -d --name rabbitmq --restartalways \ -p 5672:5672 -p 15672:15672 \ rabbitmq:management2.3 查看安装结果浏览器中输入 http:///127.0.0.1:15672进入rabbitMQ 管理界面默认用户名和密码都是guest注意如果是在阿里云环境中运行需要将5672和15672端口添加到安全组三、rabbitMQ 工作模式官网介绍了7中工作模式如下3.1 Work Queues生产者将消息发送到默认交换机再推送到自定义队列多个消费者监听同一个队列时谁先抢到消息算谁的。当生产者只有一个且消费者只有一个时就是“hello world”模式即简单模式。3.2 发布订阅模式publish/Subscribe需要创建一个fanout类型的交换机无需设置路由键交换机将消息广播到与其绑定的所有队列中再由消费者进行消费。3.3 路由模式routing需要创建一个 direct 类型的交换机并设置路由键将消息发送到特定的队列中然后由消费者消费消息。3.4 主题模式topics需要创建一个 topic 类型的交换机并设置用通配符表示的路由键将消息发送到匹配的队列中。匹配规则* 表示匹配一个词# 表示匹配0个或多个词四、基于 golang 使用 RabbitMQ在go语言中使用 RabbitMQ需引用第三方包github.com/rabbitmq/amqp091-gogo get https://github.com/rabbitmq/amqp091-go4.1 连接 rabbitMQ 服务器// 连接 rabbitMQ 服务器延迟关闭 conn, err : amqp.Dial(amqp://guest:guestlocalhost:5672/) if err ! nil { panic(err) } defer conn.Close()4.2 创建连接管道 channel// 创建channel ch, err : conn.Channel() if err ! nil { panic(err) } defer ch.Close()4.3 创建交换机 exchange// 创建交换机 err ch.ExchangeDeclare( direct.test.go, // 交换机名称 direct, // 交换机类型 true, // 是否持久化 false, // 是否自动删除 false, // 是否内部交换机 false, // 是否等待 nil, // 其他参数 )4.4 创建队列 queue// 创建队列 q, err : ch.QueueDeclare( queue.direct.test.go, // 队列名称 true, // 是否持久化 false, // 是否自动删除 false, // 是否排他 false, // 是否等待 nil, // 其他参数 ) if err ! nil { panic(err) }4.5 绑定交换机与队列// 绑定交换机与队列 ch.QueueBind( q.Name, // 队列名称 test, // 路由键 direct.test.go, // 交换机名称 false, // 是否等待 nil, // 其他参数 )4.6 发布消息// 创建超时context ctx, cancel : context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // 推送消息 body : Hello World!88888 ch.PublishWithContext(ctx, direct.test.go, // 交换机名称 test, // 路由键 false, // 是否等待 false, // 是否立即 amqp.Publishing{ ContentType: text/plain, Body: []byte(body), }, )4.7 消费消息// 消费消息 msgs, err : ch.Consume( queue.direct.test.go, // 队列名称 , // 消费者标识 false, // 是否自动确认 false, // 是否排他 false, // 是否等待 false, // 其他参数 nil, // 其他参数 ) // 取出管道中的消息并打印 for msg : range msgs { fmt.Printf(消费到消息内容%s \n, string(msg.Body)) msg.Ack(false) }生产者发送消息完整代码package main import ( context fmt time amqp github.com/rabbitmq/amqp091-go ) func main() { // 连接 rabbitMQ 服务器延迟关闭 conn, err : amqp.Dial(amqp://guest:guestlocalhost:5672/) if err ! nil { panic(err) } defer conn.Close() // 创建channel ch, err : conn.Channel() if err ! nil { panic(err) } defer ch.Close() // 创建交换机 err ch.ExchangeDeclare( direct.test.go, // 交换机名称 direct, // 交换机类型 true, // 是否持久化 false, // 是否自动删除 false, // 是否内部交换机 false, // 是否等待 nil, // 其他参数 ) // 创建队列 q, err : ch.QueueDeclare( queue.direct.test.go, // 队列名称 true, // 是否持久化 false, // 是否自动删除 false, // 是否排他 false, // 是否等待 nil, // 其他参数 ) if err ! nil { panic(err) } // 绑定交换机与队列 ch.QueueBind( q.Name, // 队列名称 test, // 路由键 direct.test.go, // 交换机名称 false, // 是否等待 nil, // 其他参数 ) // 创建超时context ctx, cancel : context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // 推送消息 body : Hello World!88888 ch.PublishWithContext(ctx, direct.test.go, // 交换机名称 test, // 路由键 false, // 是否等待 false, // 是否立即 amqp.Publishing{ ContentType: text/plain, Body: []byte(body), }, ) fmt.Printf(推送消息%s, body) }消费者端接收消息完整代码package main import ( fmt amqp github.com/rabbitmq/amqp091-go ) func main() { // 连接 rabbitMQ 服务器延迟关闭 conn, err : amqp.Dial(amqp://guest:guestlocalhost:5672/) if err ! nil { panic(err) } defer conn.Close() // 创建channel ch, err : conn.Channel() if err ! nil { panic(err) } defer ch.Close() // 消费消息 如果生产者端或手动建立了交换机和队列及绑定关系则消费者端可直接根据队列名消费消息而无需再创建交换机、队列等 msgs, err : ch.Consume( queue.direct.test.go, // 队列名称 , // 消费者标识 false, // 是否自动确认 false, // 是否排他 false, // 是否等待 false, // 其他参数 nil, // 其他参数 ) // 取出管道中的消息并打印 for msg : range msgs { fmt.Printf(消费到消息内容%s \n, string(msg.Body)) msg.Ack(false) } }在队列 queque.direct.test.go 中可查看到消息内容五、RabbitMQ 进阶篇5.1 消息的可靠性投递三种消息丢失场景①生产者发送消息到rabbitMQ服务的过程中出现丢失②rabbitMQ服务器进行消息持久化的过程中出现丢失比如服务宕机重启③消费者拉取信息时存在网络波动等导致消息丢失或消息者处理消息异常导致丢失消息丢失的解决方案①消息确认机制针对生产者Confirm 模式是 RabbitMQ 提供的一种消息可靠性保障机制。当生产者通过 Confirm 模式发送消息时它会等待 RabbitMQ 的确认确保消息已经被正确地投递到了指定的 Exchange 中。消息正确投递到 queue 时会返回 ack。消息没有正确投递到 queue 时会返回 nack。如果 exchange 没有绑定 queue也会出现消息丢失使用方法:生产者通过 confirm.select 方法将 Channel 设置为 Confirm 模式发送消息后通过添加 add confirm listener 方法监听消息的确认状态。②消息持久化机制针对rabbitMQ服务器持久化机制是指将消息存储到磁盘以保证在 RabbitMQ 服务器宕机或重启时消息不会丢失。使用方法:生产者通过将消息的 delivery mode 属性设置为 2将消息标记为持久化。队列也需要进行持久化设置确保队列在 RabbitMQ 服务器重启后仍然存在。经典队列需要将durable属性设置为true。而仲裁队列和流式队列默认必须持久化保存。注意事项:持久化机制会影响性能因此在需要确保消息不丢失的场景下使用③ACK事务机制针对消费者ACK 事务机制用于确保消息被正确消费。当消息被消费者成功处理后消费者发送确认(ACK)给RabbitMQ告知消息可以被移除。这个过程是自动处理的也可以关闭进行手工发送 ACK。使用方法:在 RabbitMQ 中ACK 机制默认是开启的。当消息被消费者接收后会立即从队列中删除除非消费者发生异常。可以手动开启 ACK 机制通过将 auto_ack 参数设置为 False手动控制消息的 ACK注意事项:ACK 机制可以确保消息不会被重复处理但如果消费者发生异常或者未发送 ACK消息可能会被重复投递三种重复消费场景①生产者重复发送同一条消息②rabbit服务器消费者消费消息后没来得及发送ACK消息rabbit服务器挂掉MQ认为消息还未被消费当MQ重启后会继续推送这条消息③消费者消费者处理完消息没来得及发送ACK确认消息消费者挂掉。MQ认为消息还未被消费当消费者重启后会再次接收到这条消息重复消费的解决方案①使用数据库唯一约束局限性大②插入消费记录根据消息ID将消息先插入数据库插入成功后给rabbitMQ返回ACK确认消息。消费者处理完业务则增加标记表示消息已处理成功如果处理失败则记录失败次数及原因已提醒管理人员进行手动处理。消息堆积的原因消息堆积的解决方案①优化消费者性能增加消费者数量②增加队列的容量以存储更多的消息③将无法处理的消息转移到死信队列④将大消息分割为小消息提高处理速度⑤简化消费端业务处理逻辑⑥控制生产者发送消息的速度⑦设置消息优先级优先处理高优先级的消息5.2 消费端限流通过设置rabbitMQ的prefetch count参数可以控制服务器一次投递给消费者的消息数量以适应消费者处理消息的速率避免大量消息都投递到消费者。// 消费者端代码 //设置消费端限流 rabbitmq未收到ack消息时只投递1条消息到消费者 err ch.Qos( 1, // prefetch count 0, // prefetch size false, // global )5.3 消息超时在 RabbitMQ 中设置消息或队列的“超时”即自动过期/消失主要有两种方式都通过 x-message-ttl 参数来实现。当消息在队列中停留的时间超过这个值且未被消费者确认ACKRabbitMQ 会将该消息标记为“死信”Dead Letter或直接丢弃。优先级如果队列设置了 TTL消息也设置了 TTL取两者中较短的那个队列级 TTL该队列中的所有消息都需要相同的过期时间在声明队列 (QueueDeclare) 时通过 Arguments 参数设置。// 创建队列 q, err : ch.QueueDeclare( queue.direct.test.ttl2, // 队列名称 true, // 是否持久化 false, // 是否自动删除 false, // 是否排他 false, // 是否等待 map[string]any{ // 其他参数 amqp.QueueMessageTTLArg: 30000, // 队列中消息过期时间单位是毫秒 }, )消息级 TTL同一个队列中不同消息需要不同的过期时间例如VIP 用户消息保留 1 小时普通用户保留 5 分钟。在发布消息 (Publish) 时通过 Publishing 属性的 Expiration 字段设置。body : fmt.Sprintf(Hello World! %d, i) ch.PublishWithContext(ctx, direct.test.go, // 交换机名称 test, // 路由键 false, // 是否等待 false, // 是否立即 amqp.Publishing{ ContentType: text/plain, Body: []byte(body), Expiration: 5000, // 发布消息时可以设置该条消息的过期时间与队列的消息过期时间参数对比取较短的过期时间 }, )5.4 死信队列死信队列是 RabbitMQ 中一种特殊的机制用于接收那些无法正常被消费的消息。产生的原因消息被拒绝消息过期队列满了使用方法x-dead-letter-exchange指定死信消息要转发到的交换机名称x-dead-letter-routing-key指定死信消息转发时的路由键// 创建队列 q, err : ch.QueueDeclare( queue.direct.test.ttl2, // 队列名称 true, // 是否持久化 false, // 是否自动删除 false, // 是否排他 false, // 是否等待 map[string]any{ // 其他参数 amqp.QueueMessageTTLArg: 30000, // 队列中消息过期时间 x-dead-letter-exchange: exchange.direct.dead.leter, // 过期后发往死信队列绑定的交换机 x-dead-letter-routing-key: routing.key.dead.leter, // 指定路由键 }, )应用场景订单超时自动取消异常消息隔离与报警消息重试机制5.5 延迟队列延迟队列是消息中间件中一种特殊的场景指消息被发送后不会立即被消费者消费而是需要在指定的时间之后才能被消费。实现方式方案一正常队列 TTL 死信队列方案二rabbitmq_delayed_message_exchange 插件应用场景5.6 惰性队列尽可能将消息直接存储在磁盘上只在消费者请求时才将少量消息加载到 RAM 中。内存占用极低且稳定能轻松处理百万级甚至亿级的消息堆积不会因内存爆炸而宕机。使用方法创建队列时配置 x-queue-mode 参数为 lazy。// 创建队列 q, err : ch.QueueDeclare( queue.direct.test.ttl3, // 队列名称 true, // 是否持久化 false, // 是否自动删除 false, // 是否排他 false, // 是否等待 map[string]any{ // 其他参数 x-queue-mode: lazy, // 关键配置开启惰性模式 }, )使用场景5.7 优先级队列优先级队列允许你在发送消息时给每条消息分配一个优先级0-255。当消费者从队列中取消息时RabbitMQ 会优先投递优先级高的消息而不是严格按照“先进先出”FIFO的顺序。使用方法在 RabbitMQ 中优先级队列不是默认开启的需要在声明队列时设置最大优先级参数 x-max-priority。数值越大优先级越高。创建队列时配置 x-max-priority 参数发送消息时设置消息的优先级 Priority。// 创建队列 q, err : ch.QueueDeclare( queue.direct.test.ttl3, // 队列名称 true, // 是否持久化 false, // 是否自动删除 false, // 是否排他 false, // 是否等待 map[string]any{ // 其他参数 x-max-priority: int32(10), // 设置最大优先级为 10 }, ) // 发送消息 ch.PublishWithContext(ctx, direct.test.go, // 交换机名称 test, // 路由键 false, // 是否等待 false, // 是否立即 amqp.Publishing{ ContentType: text/plain, Body: []byte(body), Priority: 9, // 设置消息优先级值越大优先级越高 }, )应该场景优先级队列缺点内存消耗剧增只有“相对”优先没有“绝对”插队六、RabbitMQ 集群篇6.1 集群搭建环境说明 windows wsl 系统中运用 docker compose 搭建 rabbitmq 集群6.1.1 创建相关目录目录结构如下: rabbitmq-cluster/ ├── docker-compose.yml ├── entrypoint.sh └── data/ # (可选) 用于持久化数据脚本运行后会自动生成子文件夹6.1.2 创建自动加入集群的 shell 脚本 entrypoint.sh创建 entrypoint.sh 文件用于将 rabbitmq 节点自动加入到集群创建完该文件后需要确保脚本有执行权限执行命令 chmod x entrypoint.sh#!/bin/bash set -e # 配置变量 COOKIE_VAL${RABBITMQ_ERLANG_COOKIE:-secret_cookie_123} COOKIE_FILE/var/lib/rabbitmq/.erlang.cookie MY_HOST$(hostname) MASTER_NODErabbitrabbitmq1 echo [$(date)] 启动节点: $MY_HOST # 1. 统一 Erlang Cookie (集群通信的关键) if [ ! -f $COOKIE_FILE ]; then echo 写入 Erlang Cookie... echo $COOKIE_VAL $COOKIE_FILE chmod 600 $COOKIE_FILE chown rabbitmq:rabbitmq $COOKIE_FILE fi # 2. 判断是否为主节点 (rabbitmq1) if [ $MY_HOST rabbitmq1 ]; then echo [$(date)] 检测到是主节点 (rabbitmq1)直接启动服务... exec rabbitmq-server fi # 3. 非主节点逻辑后台启动 - 等待主节点 - 加入集群 - 前台运行 echo [$(date)] 检测到是从节点准备加入集群 $MASTER_NODE ... # 启动 RabbitMQ 为后台守护进程 rabbitmq-server -detached # 等待本地服务完全就绪 echo [$(date)] 等待本地服务就绪... until rabbitmqctl status /dev/null 21; do sleep 2 done # 等待主节点可连接 (防止主节点还没起好就尝试加入) echo [$(date)] 等待主节点 $MASTER_NODE 响应... while ! rabbitmqctl ping -n $MASTER_NODE /dev/null 21; do echo 主节点未就绪等待 2 秒... sleep 2 done # 执行集群加入操作 echo [$(date)] 执行加入集群操作... rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl join_cluster $MASTER_NODE rabbitmqctl start_app echo [$(date)] 成功加入集群重启进程以前台模式运行... # 停止后台进程以便 exec 接管容器保持存活 rabbitmqctl stop # 4. 以前台模式正式运行 (此时已属于集群) exec rabbitmq-server6.1.3 创建 docker-compose.ymlservices: # --- 节点 1 (主节点/种子节点) --- rabbitmq1: image: rabbitmq:3.13-management-alpine hostname: rabbitmq1 container_name: rabbitmq1 environment: # 基础认证 - RABBITMQ_DEFAULT_USERadmin - RABBITMQ_DEFAULT_PASSadmin # 集群通信密钥 (所有节点必须一致) - RABBITMQ_ERLANG_COOKIEsecret_cookie_123 ports: - 15673:15672 # 管理界面 - 5673:5672 # AMQP 端口 volumes: - ./data/rabbitmq1:/var/lib/rabbitmq # 挂载脚本到容器内 - ./entrypoint.sh:/entrypoint.sh:ro entrypoint: [bash, /entrypoint.sh] networks: - rabbitmq_net healthcheck: test: [CMD, rabbitmq-diagnostics, ping] interval: 10s timeout: 5s retries: 5 restart: always # --- 节点 2 --- rabbitmq2: image: rabbitmq:3.13-management-alpine hostname: rabbitmq2 container_name: rabbitmq2 environment: - RABBITMQ_DEFAULT_USERadmin - RABBITMQ_DEFAULT_PASSadmin - RABBITMQ_ERLANG_COOKIEsecret_cookie_123 ports: - 15674:15672 - 5674:5672 volumes: - ./data/rabbitmq2:/var/lib/rabbitmq # 挂载脚本到容器内 - ./entrypoint.sh:/entrypoint.sh:ro entrypoint: [bash, /entrypoint.sh] depends_on: rabbitmq1: condition: service_healthy # 确保节点 1 健康后再启动节点 2 networks: - rabbitmq_net healthcheck: test: [CMD, rabbitmq-diagnostics, ping] interval: 10s timeout: 5s retries: 5 restart: always # --- 节点 3 --- rabbitmq3: image: rabbitmq:3.13-management-alpine hostname: rabbitmq3 container_name: rabbitmq3 environment: - RABBITMQ_DEFAULT_USERadmin - RABBITMQ_DEFAULT_PASSadmin - RABBITMQ_ERLANG_COOKIEsecret_cookie_123 ports: - 15675:15672 - 5675:5672 volumes: - ./data/rabbitmq3:/var/lib/rabbitmq # 挂载脚本到容器内 - ./entrypoint.sh:/entrypoint.sh:ro entrypoint: [bash, /entrypoint.sh] depends_on: rabbitmq1: condition: service_healthy rabbitmq2: condition: service_healthy networks: - rabbitmq_net healthcheck: test: [CMD, rabbitmq-diagnostics, ping] interval: 10s timeout: 5s retries: 5 restart: always networks: rabbitmq_net: driver: bridgeservices: # --- 节点 1 (主节点/种子节点) --- rabbitmq1: image: rabbitmq:3.13-management-alpine hostname: rabbitmq1 container_name: rabbitmq1 environment: # 基础认证 - RABBITMQ_DEFAULT_USERadmin - RABBITMQ_DEFAULT_PASSadmin # 集群通信密钥 (所有节点必须一致) - RABBITMQ_ERLANG_COOKIEsecret_cookie_123 ports: - 15673:15672 # 管理界面 - 5673:5672 # AMQP 端口 volumes: - ./data/rabbitmq1:/var/lib/rabbitmq # 挂载脚本到容器内 - ./entrypoint.sh:/entrypoint.sh:ro entrypoint: [bash, /entrypoint.sh] networks: - rabbitmq_net healthcheck: test: [CMD, rabbitmq-diagnostics, ping] interval: 10s timeout: 5s retries: 5 restart: always # --- 节点 2 --- rabbitmq2: image: rabbitmq:3.13-management-alpine hostname: rabbitmq2 container_name: rabbitmq2 environment: - RABBITMQ_DEFAULT_USERadmin - RABBITMQ_DEFAULT_PASSadmin - RABBITMQ_ERLANG_COOKIEsecret_cookie_123 ports: - 15674:15672 - 5674:5672 volumes: - ./data/rabbitmq2:/var/lib/rabbitmq # 挂载脚本到容器内 - ./entrypoint.sh:/entrypoint.sh:ro entrypoint: [bash, /entrypoint.sh] depends_on: rabbitmq1: condition: service_healthy # 确保节点 1 健康后再启动节点 2 networks: - rabbitmq_net healthcheck: test: [CMD, rabbitmq-diagnostics, ping] interval: 10s timeout: 5s retries: 5 restart: always # --- 节点 3 --- rabbitmq3: image: rabbitmq:3.13-management-alpine hostname: rabbitmq3 container_name: rabbitmq3 environment: - RABBITMQ_DEFAULT_USERadmin - RABBITMQ_DEFAULT_PASSadmin - RABBITMQ_ERLANG_COOKIEsecret_cookie_123 ports: - 15675:15672 - 5675:5672 volumes: - ./data/rabbitmq3:/var/lib/rabbitmq # 挂载脚本到容器内 - ./entrypoint.sh:/entrypoint.sh:ro entrypoint: [bash, /entrypoint.sh] depends_on: rabbitmq1: condition: service_healthy rabbitmq2: condition: service_healthy networks: - rabbitmq_net healthcheck: test: [CMD, rabbitmq-diagnostics, ping] interval: 10s timeout: 5s retries: 5 restart: always networks: rabbitmq_net: driver: bridge6.1.4 启动集群并查看相关日志# 启动集群 docker compose up -d # 查看日志 docker logs rabbitmq2或者查看节点的集群状态Running Nodes 显示三个节点表示创建集群成功# 查看集群状态 docker exec -it rabbitmq1 rabbitmqctl cluster_status6.1.5 访问网页管理端查看集群状态访问任意节点的管理端可以查看到三个节点如图所示# 访问管理端用户名及密码都是admindocker-compose.yml中配置的 http://127.0.0.1:156736.2 Quorum 队列6.2.1 概念Quorum Queue仲裁队列只能在 RabbitMQ 集群Cluster环境中创建和使用创建队列后会在集群中的所有节点都创建副本其中一个节点被选举为 Leader负责处理读写请求 另外的节点作为 Follower同步存储数据副本。采用 “多数派写入” 机制当消息写入时必须由集群中超过半数的节点确认成功后才视为写入成功并返回给生产者。当消费者成功处理消息并发送 ACK 确认后该消息会从集群中所有节点Leader 和所有 Follower上同时删除。6.2.2 特点6.3.3 使用方法在任意一个节点上创建队列type 选择 Quorum即可创建仲裁队列。6.3 Stream 队列RabbitMQ Stream Queue流队列是 RabbitMQ 从 3.9 版本引入的一种全新队列类型专为高吞吐、大数据量、日志类场景设计。 它的核心设计理念借鉴了 Apache Kafka将 RabbitMQ 从一个传统的“即时消费”消息代理扩展为支持“日志回放”和“无限存储”的流处理平台。机制消息像写日志一样按顺序追加到磁盘文件中。区别经典/仲裁队列消息被消费并 ACK 后立即删除。流队列消息被消费后不会删除而是永久保留直到达到配置的保留策略如时间或大小限制。优势支持历史消息回放。新消费者加入时可以从头开始读取或者从任意时间点Offset开始读取。6.4 基于 go 语言使用集群消息队列的方法6.4.1 创建连接func connectToCluster(nodes []string) (*amqp.Connection, error) { var lastErr error // 遍历所有节点尝试连接 for _, nodeUrl : range nodes { fmt.Printf(正在尝试连接节点: %s ...\n, nodeUrl) conn, err : amqp.Dial(nodeUrl) if err nil { // 连接成功 fmt.Printf(✅ 成功连接到节点: %s\n, nodeUrl) return conn, nil } lastErr err log.Printf(❌ 连接节点 %s 失败: %v, nodeUrl, err) // 可选可以在这里加一个极短的延时避免瞬间风暴 time.Sleep(100 * time.Millisecond) } // 如果所有节点都试过了还是失败返回最后一个错误 return nil, fmt.Errorf(无法连接到集群中的所有节点最后错误: %w, lastErr) }6.4.2 创建仲裁队列需要指定队列的类型为仲裁队列x-queue-type quorum// 创建仲裁队列 q, err : ch.QueueDeclare( queue.direct.test.cluster, // 队列名称 true, // 是否持久化 false, // 是否自动删除 false, // 是否排他 false, // 是否等待 amqp.Table{ amqp.QueueTypeArg: amqp.QueueTypeQuorum, // 重点集群使用仲裁队列 }, )6.4.3 完整代码示例package main import ( context fmt log time amqp github.com/rabbitmq/amqp091-go ) // 配置集群地址用逗号分隔多个节点 // 格式amqp://用户名:密码节点1:端口节点2:端口节点3:端口/虚拟主机 var clusterUrls []string{ amqp://admin:admin127.0.0.1:5673/, amqp://admin:admin127.0.0.1:5674/, amqp://admin:admin127.0.0.1:5675/, } func main() { // 启动重连循环 reconnectLoop() } func connectToCluster(nodes []string) (*amqp.Connection, error) { var lastErr error // 遍历所有节点尝试连接 for _, nodeUrl : range nodes { fmt.Printf(正在尝试连接节点: %s ...\n, nodeUrl) conn, err : amqp.Dial(nodeUrl) if err nil { // 连接成功 fmt.Printf(✅ 成功连接到节点: %s\n, nodeUrl) return conn, nil } lastErr err log.Printf(❌ 连接节点 %s 失败: %v, nodeUrl, err) // 可选可以在这里加一个极短的延时避免瞬间风暴 time.Sleep(100 * time.Millisecond) } // 如果所有节点都试过了还是失败返回最后一个错误 return nil, fmt.Errorf(无法连接到集群中的所有节点最后错误: %w, lastErr) } func reconnectLoop() { var conn *amqp.Connection var ch *amqp.Channel var notifyClose chan *amqp.Error // 重连间隔 reconnectInterval : 5 * time.Second for { // 1. 尝试连接集群 // Dial 会依次尝试 urls 中的地址直到成功或全部失败 // 注意amqp091-go 的 Dial 通常只接受一个 URL 字符串 fmt.Printf(正在尝试连接 RabbitMQ 集群: %s\n, clusterUrls) c, err : connectToCluster(clusterUrls) if err ! nil { log.Printf(连接集群失败: %v, %s 后重试..., err, reconnectInterval) time.Sleep(reconnectInterval) continue } conn c notifyClose make(chan *amqp.Error) conn.NotifyClose(notifyClose) // 2. 创建 Channel channel, err : conn.Channel() if err ! nil { log.Printf(创建 Channel 失败: %v, err) conn.Close() time.Sleep(reconnectInterval) continue } ch channel fmt.Println(✅ 成功连接到 RabbitMQ 集群并创建 Channel) // 3. 在这里执行业务逻辑 (发布/消费) // 建议将 ch 传递给具体的业务协程 doWork(ch) // 4. 等待连接关闭通知 (阻塞) err -notifyClose log.Printf(连接断开: %v, err) // 清理资源 if ch ! nil { ch.Close() } if conn ! nil { conn.Close() } // 5. 等待一段时间后重试 time.Sleep(reconnectInterval) } } func doWork(ch *amqp.Channel) { // 模拟业务运行 // 在实际应用中这里会启动消费者协程或生产者循环 // 它们会使用传入的 ch // 创建交换机 err : ch.ExchangeDeclare( direct.test.cluster, // 交换机名称 direct, // 交换机类型 true, // 是否持久化 false, // 是否自动删除 false, // 是否内部交换机 false, // 是否等待 nil, // 其他参数 ) if err ! nil { fmt.Println(err) } // 创建队列 q, err : ch.QueueDeclare( queue.direct.test.cluster, // 队列名称 true, // 是否持久化 false, // 是否自动删除 false, // 是否排他 false, // 是否等待 amqp.Table{ amqp.QueueTypeArg: amqp.QueueTypeQuorum, // 重点集群使用仲裁队列 }, ) if err ! nil { panic(err) } // 绑定交换机与队列 ch.QueueBind( q.Name, // 队列名称 test, // 路由键 direct.test.cluster, // 交换机名称 false, // 是否等待 nil, // 其他参数 ) // 创建超时context ctx, cancel : context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // 推送消息 for i : range 30 { body : fmt.Sprintf(Hello World! rabbitmq cluster %d, i) ch.PublishWithContext(ctx, direct.test.cluster, // 交换机名称 test, // 路由键 false, // 是否等待 false, // 是否立即 amqp.Publishing{ ContentType: text/plain, Body: []byte(body), }, ) fmt.Printf(推送消息%s\n, body) time.Sleep(1 * time.Second) } }参考视频消息中间件夺命连环18问一口气刷完面试必问的消息中间件面试内容让你面试少走99%的弯路_哔哩哔哩_bilibili

相关文章:

消息队列RabbitMQ的配置操作及使用

一、RabbitMQ的体系结构 RabbitMQ是一个基于AMQP(Advanced Message Queuing Protocol,高级消息队列协议)实现的开源消息中间件,主要用于在分布式系统中存储和转发消息。它由Erlang语言编写,以高性能、高可用性以及高扩…...

(论文)一种基于部分欺骗音频检测的基于临时深度伪造位置方法的高效嵌入

AN EFFICIENT TEMPORARY DEEPFAKE LOCATION APPROACH BASED EMBEDDINGS FOR PARTIALLY SPOOFED AUDIO DETECTION摘要:部分伪造音频检测是一项具有挑战性的任务,在于需要在帧级别上准确地定位音频的真实性。时间性深度伪造定位( TDL )可有效地捕获特征和位…...

AE函数讲解大全 附带下载链接

Adobe After Effects(AE)简介Adobe After Effects 是一款由 Adobe 公司开发的专业动态图形和视觉效果合成软件,广泛应用于影视后期、广告制作、动画设计等领域。它支持图层式的非线性编辑,可实现复杂的特效合成、运动追踪、3D 渲染…...

从 transactional contract 读懂 ABAP 事务边界:RAP、Controlled SAP LUW 与一致性设计实践

在 SAP 新一代开发模型里,transactional contract 并不是一个只存在于文档角落里的术语,它实际上定义了 ABAP 代码在事务运行过程中能做什么、不能做什么。这个机制的意义,不只是限制开发者的自由,而是把事务一致性从靠经验推进到靠框架与规则共同保障。SAP 官方将它定义为…...

把 Test Seam 用明白:ABAP Unit 中隔离依赖、驯服遗留代码的实战指南

在日常的 ABAP 开发里,真正让单元测试变难的,往往不是断言怎么写,而是生产代码里那些甩不掉的外部依赖:数据库读写、权限校验、对象实例化、甚至某些系统状态判断。一旦这些依赖直接写死在业务逻辑中,测试就会变得脆弱、缓慢,而且高度依赖运行环境。Test Seam 存在的意义…...

读懂 SAP 中的 tuning object:把性能优化从业务对象中解耦出来

在很多 ABAP 项目里,开发人员一谈性能优化,脑海里浮现的往往是 SQL Trace、索引、Hint 或者代码重写。可是在 SAP 官方的数据建模体系里,还存在一类很容易被忽略、却非常有工程价值的对象,那就是 tuning object。它并不直接承载业务语义,也不是拿来定义字段、关联和行为逻…...

STM32N6570-DK识别STLINK问题,如何解决?

🏆本文收录于 《全栈 Bug 调优(实战版)》 专栏。专栏聚焦真实项目中的各类疑难 Bug,从成因剖析 → 排查路径 → 解决方案 → 预防优化全链路拆解,形成一套可复用、可沉淀的实战知识体系。无论你是初入职场的开发者,还是负责复杂项目的资深工程师,都可以在这里构建一套属…...

Spring Boot 中的 Redisson 分布式锁

Redisson 分布式锁依赖 <dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.23.5</version> </dependency>application.yml 配置 spring:redis:host: localhostpo…...

CentOS8 K8S

K8s–day01 理论 优势 自我修复 弹性伸缩 自动部署和回滚 服务发现和负载均衡 机密和配置管理 存储编排 批处理 Kubernetes Lable Pod 组件 控制面板组件 资源 类似于java 中的类 对象 规约spec : 描述对象的期望状态,对象具有的特征 状态:表示对象的实际状态 元数据…...

I3C Host Adapter Pro+ (3)

Easyi3C是一家领先的公司嵌入式系统该公司是一家工具提供商&#xff0c;致力于简化各种通信协议的开发和调试。其产品系列旨在帮助工程师和开发人员更高效地使用I3C、I2C等协议。 3. I3C总线时序测试&#xff1a; 根据 MIPI 协议&#xff08;如下图所示&#xff09;&#xff0…...

[Linux实战] 手把手部署Emby媒体服务器:从安装到外网访问

1. 为什么你需要一个自己的Emby媒体服务器&#xff1f; 不知道你有没有过这样的经历&#xff1a;电脑硬盘里存了几百部电影、几十季美剧&#xff0c;还有家人出游拍的无数视频和照片。想看的时候&#xff0c;要么得把移动硬盘翻出来插上&#xff0c;要么得在电脑文件夹里找半天…...

【深度学习】Upsample模块采样方式实战对比:从原理到代码实现

1. 上采样&#xff1a;从“放大镜”到“想象力”的跨越 在深度学习的图像世界里&#xff0c;上采样&#xff08;Upsample&#xff09;就像是一个神奇的“放大镜”。想象一下&#xff0c;你有一张模糊的老照片&#xff0c;想把它放大看清楚细节&#xff0c;但又不想让它变得更模…...

使用Docker Compose快速部署Nominatim地理编码服务

1. 为什么你需要一个自己的地理编码服务&#xff1f; 如果你正在开发一个地图应用、物流系统&#xff0c;或者任何需要将地址转换成经纬度&#xff08;地理编码&#xff09;&#xff0c;或者反过来将经纬度转换成地址&#xff08;反向地理编码&#xff09;的功能&#xff0c;你…...

抖音数据抓取第一步:雷电模拟器3.107版保姆级配置指南(含Xposed框架安装)

抖音数据抓取第一步&#xff1a;雷电模拟器3.107版保姆级配置指南&#xff08;含Xposed框架安装&#xff09; 如果你正准备踏入移动应用数据分析或自动化测试的领域&#xff0c;那么一个稳定、可控的安卓模拟器环境就是你不可或缺的“数字沙盒”。无论是为了研究热门应用的交互…...

VMware Workstation 16 Pro下RHEL8安装全流程:从ISO到桌面环境(附常见问题解决)

在VMware Workstation 16 Pro上优雅部署RHEL 8&#xff1a;一份面向开发者的深度配置指南 对于需要在本地构建稳定、可控的Linux开发或测试环境的工程师而言&#xff0c;在虚拟机中部署一个企业级的操作系统是日常工作流中至关重要的一环。Red Hat Enterprise Linux 8&#xff…...

DisplayPort链路训练实战:深入解析时钟恢复(CR)的机制与调试

1. 从黑屏到点亮&#xff1a;为什么时钟恢复是DP调试的第一道坎 大家好&#xff0c;我是老张&#xff0c;在芯片原厂和硬件设计圈里摸爬滚打了十几年&#xff0c;经手调试过的DisplayPort接口没有一千也有八百了。今天想和大家掏心窝子聊聊一个让无数硬件工程师头疼&#xff0c…...

SpringCloudGateway头信息处理全解析:从Forwarded到X-Forwarded的优先级与安全考量

Spring Cloud Gateway 头信息处理全解析&#xff1a;从Forwarded到X-Forwarded的优先级与安全考量 在微服务架构的实践中&#xff0c;API网关扮演着流量入口与统一管控的关键角色。Spring Cloud Gateway&#xff0c;作为Spring Cloud生态中基于响应式编程模型的网关组件&#x…...

Gogs大文件上传避坑指南:如何避免RPC failed和HTTP 413错误(含Nginx配置技巧)

Gogs大文件上传避坑指南&#xff1a;如何避免RPC failed和HTTP 413错误&#xff08;含Nginx配置技巧&#xff09; 你是否曾经在向自己的Gogs代码仓库推送一个包含大型二进制文件&#xff08;比如数据集、编译产物或者设计稿&#xff09;的提交时&#xff0c;满怀期待地敲下git …...

分组密码设计实战:为什么AES选择SPN而DES用Feistel?从硬件到安全的深度解析

分组密码设计的十字路口&#xff1a;为何AES与DES走向了不同的架构&#xff1f; 在嵌入式设备里为一个加密算法选择硬件方案时&#xff0c;工程师们常常面临一个根本性的抉择&#xff1a;是采用结构规整、加解密相似的Feistel网络&#xff0c;还是拥抱混淆扩散效率更高、但实现…...

Zotero插件:Green Frog(绿青蛙)与easyScholar联动配置全攻略

1. 为什么你需要Green Frog和easyScholar这对黄金搭档&#xff1f; 如果你是一名研究生、博士生&#xff0c;或者任何需要和大量文献打交道的科研工作者&#xff0c;我猜你一定有过这样的经历&#xff1a;在知网、谷歌学术或者Web of Science上吭哧吭哧地找文献&#xff0c;看到…...

Python实战:用ncnn验证模型转换成功的3种方法(附完整代码)

Python实战&#xff1a;用ncnn验证模型转换成功的3种方法&#xff08;附完整代码&#xff09; 最近在移动端部署模型时&#xff0c;ncnn框架成了不少开发者的首选。它轻量、高效&#xff0c;但模型从PyTorch或TensorFlow转换到ncnn格式后&#xff0c;心里总有点不踏实&#xff…...

验证码漏洞防御指南:从短信轰炸到前端绕过的7种防护方案

验证码安全架构实战&#xff1a;构建无懈可击的防御纵深体系 在数字化业务高速发展的今天&#xff0c;验证码作为人机识别与业务安全的第一道闸门&#xff0c;其重要性不言而喻。然而&#xff0c;许多开发团队和安全负责人常常陷入一个误区&#xff1a;认为部署了验证码就等同于…...

蓝队工具,一款小白都能用的Windows应急溯源工具,支持AI一键分析

0x01 工具介绍 WinTracePro 作为面向蓝队的轻量化主机溯源分析工具&#xff0c;聚焦小白友好与实战高效两大核心&#xff0c;覆盖主机信息采集、日志深度分析、任务调度核查等蓝队核心溯源场景。V1.0 已实现 Windows 多版本系统兼容&#xff0c;集成 IP 情报查询、AI 辅助分析…...

GDAL核心功能解析:为什么它是地理空间数据处理的终极选择

GDAL核心功能解析&#xff1a;为什么它是地理空间数据处理的终极选择 【免费下载链接】gdal GDAL is an open source MIT licensed translator library for raster and vector geospatial data formats. 项目地址: https://gitcode.com/gh_mirrors/gd/gdal GDAL&#xf…...

多线程Web代理服务器:Computer-Networking-A-Top-Down-Approach-NOTES作业4教程

多线程Web代理服务器&#xff1a;Computer-Networking-A-Top-Down-Approach-NOTES作业4教程 【免费下载链接】Computer-Networking-A-Top-Down-Approach-NOTES 《计算机网络&#xff0d;自顶向下方法(原书第6版)》编程作业&#xff0c;Wireshark实验文档的翻译和解答。 项目地…...

ABAP Function ALV实战:如何让采购单号点击跳转ME23N(附完整代码)

ABAP Function ALV交互实战&#xff1a;从静态表格到动态业务门户的构建 在SAP的日常开发与运维中&#xff0c;我们常常面对这样的场景&#xff1a;业务用户打开一个采购订单清单报表&#xff0c;面对密密麻麻的单号&#xff0c;他们需要逐一手动复制&#xff0c;再打开ME23N事…...

Linux代理配置避坑指南:为什么你的wget/curl总是失败?

Linux网络代理配置深度解析&#xff1a;从环境变量到工具链的实战避坑手册 如果你在Linux服务器上折腾过网络代理&#xff0c;大概率经历过这样的场景&#xff1a;明明按照教程设置了http_proxy&#xff0c;wget下载却依然龟速甚至直接报错&#xff1b;curl命令时而灵时而不灵&…...

为什么连WiFi能刷抖音却打不开百度?一文读懂DNS工作原理与急救设置

为什么连WiFi能刷抖音却打不开百度&#xff1f;一文读懂DNS工作原理与急救设置 你有没有遇到过这种让人抓狂的情况&#xff1f;家里的Wi-Fi明明显示已连接&#xff0c;手机上的抖音、微信刷得飞起&#xff0c;消息秒发秒收&#xff0c;可当你打开浏览器&#xff0c;想查点资料或…...

iPhone照片太多?教你3招清理iCloud空间但不删手机照片(附详细步骤)

iPhone照片管理终极指南&#xff1a;释放iCloud空间&#xff0c;无损保留手机回忆 每次打开iPhone&#xff0c;看到那个“iCloud存储空间已满”的弹窗&#xff0c;是不是瞬间心情就不好了&#xff1f;5GB的免费空间&#xff0c;对于爱拍照的我们来说&#xff0c;简直杯水车薪。…...

ISTQB-CTFL 4.0核心考点解析与实战模拟(终极指南)

1. 软件测试基础&#xff1a;从“找茬”到“建立信心” 很多刚接触软件测试的朋友&#xff0c;可能会觉得测试就是“找bug”&#xff0c;拿着软件点点点&#xff0c;发现哪里不对就报个问题。这个理解不能说错&#xff0c;但太片面了&#xff0c;尤其是在ISTQB-CTFL 4.0的体系里…...