当前位置: 首页 > news >正文

python如何使用Rabbitmq

目录

一、Rabbitmq介绍

二、Rabbitmq的使用场景

1、异步处理

2、服务解耦

3、流量削峰

4、日志收集

5、发布订阅

6、任务调度

三、python如何使用Rabbitmq

1、安装依赖

2、基础使用

3、消息确认

4、消息持久化

5、公平调度

6、发布订阅

7、关键字发布


一、Rabbitmq介绍

RabbitMQ是一个开源的消息中间件,基于AMQP(Advanced Message Queue Protocol,高级消息队列协议)协议实现。RabbitMQ被广泛应用于各种应用场景,如异步任务处理、日志传输、实时消息推送等。在微服务架构中,RabbitMQ是一个常见的消息中间件选择,它可以帮助服务之间实现解耦和异步通信,提高系统的可扩展性和稳定性。RabbitMQ提供了一个简单的用户页面,用户可以监控和管理消息、队列、交换器、绑定等资源。通过管理界面,用户可以直观地了解系统的运行状态,并进行相应的配置和管理操作。

二、Rabbitmq的使用场景

1、异步处理

  • 在Web应用中,当用户提交表单时,可以将表单处理任务发送给RabbitMQ,由后台服务异步处理,从而提高用户界面的响应速度。
  • 在电商系统中,用户下单后,订单处理、库存更新、支付通知等任务可以异步执行,避免阻塞主线程。

2、服务解耦

  • 在微服务架构中,不同服务之间通过RabbitMQ进行通信,可以降低服务之间的耦合度,提高系统的可扩展性和可维护性。
  • 当某个服务需要升级或维护时,可以通过RabbitMQ实现服务的平滑过渡,而不会影响其他服务的正常运行。

3、流量削峰

  • 在高并发场景中,RabbitMQ可以作为一个缓冲层,接收并存储大量的请求,然后按照设定的速率将请求转发给后端服务,从而避免后端服务因过载而崩溃。
  • 通过RabbitMQ的限流和队列机制,可以有效地控制请求的速率和数量,保护后端服务的稳定性。

4、日志收集

  • RabbitMQ可以用于收集分散在各个服务器上的日志信息,将它们集中到一个或多个日志处理服务中,进行统一的分析和处理。
  • 通过RabbitMQ,可以实现日志的实时收集、分析和报警,提高系统的运维效率和故障排查能力。

5、发布订阅

  • 在需要向多个客户端推送消息的场景中,如实时通知、消息推送等,可以使用RabbitMQ的Fanout交换器将消息广播给所有绑定的队列。
  • 通过RabbitMQ的消息广播机制,可以实现实时、可靠的消息推送服务,提高用户体验。

6、任务调度

  • RabbitMQ可以与其他任务调度框架(如Quartz)结合使用,实现定时任务、延迟任务等复杂任务调度需求。
  • 通过RabbitMQ的任务调度功能,可以灵活地控制任务的执行时间和频率,提高系统的自动化程度和运行效率。

三、python如何使用Rabbitmq

Rabbitmq网址https://www.rabbitmq.com/tutorials

1、安装依赖

安装第三方库pika:pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pika

pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pika

2、基础使用

生产者模型代码如下

import pika# 生产者模型connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  # 连接本地的rabbitmq
channel = connection.channel()  # 连接通道
channel.queue_declare(queue='list')  #  创建一个名叫list的队列
channel.basic_publish(exchange='',routing_key='list',  # 向那个队列发布信息body='lol',# 发布的信息)
connection.close()  # 关闭连接

消费者代码如下

import pika# 消费者模型connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  # 连接本地的rabbitmq
channel = connection.channel()  # 连接通道
channel.queue_declare(queue='list')  #  为确保队列存在,创建一个list队列def callback(ch, method, properties, body):print(f" [x] 消费者接收到了任务 {body}")channel.basic_consume(queue='list',on_message_callback=callback,auto_ack=True)  # 接收信息:queue表示监听的队列,on_message_callback表示接收到信息执行的函数,auto_ack表示默认执行回复
channel.start_consuming()   # 开启永无止境的循环监听该队列

3、消息确认

在队列中,执行任务可能需要几秒钟,您可能想知道如果 使用者启动一个长任务,并在完成之前终止。 使用我们当前的代码,一旦 RabbitMQ 将消息传递给消费者,它就会 立即将其标记为删除。在这种情况下,如果您终止 worker,则它刚刚处理的消息丢失了。调度的消息 对于这个尚未处理的特定 worker 来说,也会丢失。为了确保消息永远不会丢失,RabbitMQ 支持消息确认。ack由 consumer 告诉 RabbitMQ 已收到特定消息, 处理,并且 RabbitMQ 可以自由删除它。

这是由消费者来进行改变的,代码如下:

需要将auto_ack改为False,然后在回调函数里加入ch.basic_ack(delivery_tag=method.delivery_tag)

import pika# 消费者模型connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  # 连接本地的rabbitmq
channel = connection.channel()  # 连接通道
channel.queue_declare(queue='list')  #  创建一个队列def callback(ch, method, properties, body):print(f" [x] 消费者接收到了任务 {body}")ch.basic_ack(delivery_tag=method.delivery_tag) # 给队列回复进行确认channel.basic_consume(queue='list2',on_message_callback=callback,auto_ack=False)  # auto_ack改为False
channel.start_consuming()  # 开启永无止境的循环监听该队列

4、消息持久化

如果 RabbitMQ 服务器 停止。

当 RabbitMQ 退出或崩溃时,它会忘记队列和消息 除非你告诉它不要这样做。需要做两件事来确保 消息不会丢失:我们需要将队列消息都标记为 耐用。然和把队列和消息保存在磁盘里。

这是有生产者来进行改变的,代码如下:

durable=True在创建队列时声明持久化,delivery_mode=pika.DeliveryMode.Persistent让信息做持久化

import pika# 生产者模型connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  # 连接本地的rabbitmq
channel = connection.channel()  # 连接通道
channel.queue_declare(queue='list2',durable=True)  #  创建一个队列,durable=True表示队列支持持久化
channel.basic_publish(exchange='',routing_key='list2',  # 向那个队列发布信息body='lol',# 发布的信息properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent  # 让信息做持久化))
connection.close()  # 关闭连接

5、公平调度

您可能已经注意到,调度仍然没有完全工作 正如我们想要的那样。例如,在有两个 worker 的情况下,当所有 奇数消息很重,偶数消息很轻,一个 worker 将是 一直很忙,另一个几乎不做任何工作。井 RabbitMQ 对此一无所知,仍会 dispatch 消息均匀。

发生这种情况是因为 RabbitMQ 只是在消息 进入队列。它不看未确认的数量 消息。它只是盲目地调度每 n 条消息 到第 n 个消费者。

为了解决这个问题,我们可以使用带有 setup 的 channel 方法。它使用协议方法告诉 RabbitMQ 不要一次向 worker 提供多条消息。或者,换句话说,不要调度 向 worker 发送新消息,直到它处理并确认 上一个。相反,它会将其分派给下一个仍然不忙的 worker。

这是由消费者来进行改变的,代码如下:

channel.basic_qos(prefetch_count=1)进行闲置派发

import pika# 消费者模型connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  # 连接本地的rabbitmq
channel = connection.channel()  # 连接通道
channel.queue_declare(queue='list')  #  为确保队列存在,创建一个list队列def callback(ch, method, properties, body):print(f" [x] 消费者接收到了任务 {body}")channel.basic_qos(prefetch_count=1)  # 闲置派发
channel.basic_consume(queue='list',on_message_callback=callback,auto_ack=True)  # 接收信息:queue表示监听的队列,on_message_callback表示接收到信息执行的函数,auto_ack表示默认执行回复
channel.start_consuming()   # 开启永无止境的循环监听该队列

6、发布订阅

发布订阅模式是将信息发布给所有的订阅者,其特点就是有交换机。

channel.exchange_declare(exchange='m1',exchange_type='fanout')声明一个交换机,类型为fanout,是将信息发给所有的订阅者

发布者代码如下:

import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  # 连接rabbitmq
channel = connection.channel()  # 连接通道channel.exchange_declare(exchange='m1',exchange_type='fanout')  # fanout:将信息发给所有的队列
channel.queue_declare(queue='')  #  创建一个队列,durable=True表示队列支持持久化
channel.basic_publish(exchange='m1',routing_key='',  # 队列名称body='lol',)# 发布的数据
connection.close()  # 关闭连接

订阅者代码如下:

import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  # 连接rabbitmq
channel = connection.channel()channel.exchange_declare(exchange='m1',exchange_type='fanout',)  # fanout:将信息发给所有的队列
result=channel.queue_declare(exclusive=True,queue='')  #  创建一个随机队列
queue_name=result.method.queue  # 拿到队列名字
print(queue_name)
channel.queue_bind(exchange='m1',queue=queue_name)  # 对exchange和queue进行绑定def callback(ch, method, properties, body):print(f" [x] 消费者接收到了任务 {body}")channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)
channel.start_consuming()  # 开启永无止境的循环监听该队列

7、关键字发布

关键字发布就是在发布订阅模式基础上,将不同信息发布给不同的订阅者。

channel.exchange_declare(exchange='m2',exchange_type='direct') ,声明一个交换机,交换机的类型为direct,将信息发布给指定的订阅者。

发布者代码如下:

import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  # 连接rabbitmq
channel = connection.channel()  # 连接通道channel.exchange_declare(exchange='m2',exchange_type='direct')  # direct:将信息发布给指定的订阅者
channel.queue_declare(queue='')  #  创建一个队列,durable=True表示队列支持持久化
channel.basic_publish(exchange='m2',routing_key='hhq',  # 队列名称body='cf',)# 发布的数据
connection.close()  # 关闭连接

订阅者代码如下:

import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  # 连接rabbitmq
channel = connection.channel()channel.exchange_declare(exchange='m2',exchange_type='direct')  # exchange:交易所的名称,fanout:将信息发给所有的队列
result=channel.queue_declare(exclusive=True,queue='')  #  创建一个随机队列
queue_name=result.method.queue  # 拿到队列名字
print(queue_name)
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='hhq')  # 对exchange和queue进行绑定def callback(ch, method, properties, body):print(f" [x] 消费者接收到了任务 {body}")channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)
channel.start_consuming()  # 开启永无止境的循环监听该队列

相关文章:

python如何使用Rabbitmq

目录 一、Rabbitmq介绍 二、Rabbitmq的使用场景 1、异步处理 2、服务解耦 3、流量削峰 4、日志收集 5、发布订阅 6、任务调度 三、python如何使用Rabbitmq 1、安装依赖 2、基础使用 3、消息确认 4、消息持久化 5、公平调度 6、发布订阅 7、关键字发布 一、Rabbi…...

分布式,微服务,SpringCloudAlibaba,nacos,gateway,openFeign

想学习微服务SpringCloudAlibaba的小伙伴,可以观看视频 地址: https://www.bilibili.com/video/BV1cFDEYWEkY/?vd_source14d27ec13a4737c281b7c79463687112分布式架构和微服务是两个密切相关但又有所区别的概念。它们在现代软件工程中经常被提及&#…...

MySQL初学之旅(3)约束

目录 1.前言 2.正文 2.1约束类型 2.2NULL约束 2.3UNIQUE约束 2.4DEFAULT约束 2.5PRIMARY KEY主键约束 2.6FOREIGN KEY外键约束 2.7CHECK约束 3.小结 1.前言 哈喽大家好啊,今儿来继续给大家分享最近学习的MySQL和约束相关的知识点,希望大家一起…...

使用YOLOv9进行图像与视频检测

大家好,YOLOv9 与其前身v8一样,专注于识别和精确定位图像和视频中的对象。本文将介绍如何使用YOLOv9进行图像与视频检测,自动驾驶汽车、安全系统和高级图像搜索等应用在很大程度上依赖于此功能,YOLOv9 引入了比 YOLOv8 更令人印象…...

C# 中的 LINQ:轻松处理集合和数据

C#中的LINQ(Language Integrated Query),这是一个非常强大且实用的功能,可以简化集合操作和数据查询。以下是一篇关于C#中LINQ使用的文章。 引言 LINQ(Language Integrated Query)是C#语言的一个重要特性…...

【征稿倒计时!华南理工大学主办 | IEEE出版 | EI检索稳定】2024智能机器人与自动控制国际学术会议 (IRAC 2024)

#华南理工大学主办!#IEEE出版!EI稳定检索!#组委阵容强大!IEEE Fellow、国家杰青等学术大咖领衔出席!#会议设置“优秀论文”“优秀青年学者报告”“优秀海报”等评优奖项 2024智能机器人与自动控制国际学术会议 &#…...

RHCE的学习(20)

变量5种赋值方式 shell中变量赋值5种方式,其中采用name10的方法称A 直接赋值 nameB read命令 read v1C 使用命令行参数 ($1 $2 $3 ..) name$1D 使用命令的输入 username$(whoami)E 从文件读取 #cut -d : -f1 /etc/passwd > /user.listfor…...

控制器ThinkPHP6

五、控制器中对数组值的返回 在做接口服务时,很多时候回使用数组作为返回值,那么数组如何返回成 json呢? 在 tp6 中返回json 很简单,直接使用 json 进行返回即可,例如: public function index(){$resarra…...

1. Django中的URL调度器 (项目创建与简单测试)

1. 创建 Django 项目 运行以下命令创建一个名为 blog_project 的 Django 项目: django-admin startproject blog_project2. 创建博客应用 Django 中,项目可以包含多个应用。创建一个名为 blog 的应用: cd blog_project python manage.py …...

学习python的第十天之数据类型——dict字典

学习python的第十天之数据类型——dict字典 Python 中的字典(Dictionary)是一个非常强大的内置数据类型,它用来存储键值对(key-value pairs)信息。字典是无序的,这意味着它们不会记录你添加键值对的顺序&am…...

华为Mate 70临近上市:代理IP与抢购攻略

随着科技的飞速发展,智能手机已经成为我们日常生活中不可或缺的一部分。而在众多智能手机品牌中,华为一直以其卓越的技术和创新力引领着行业的发展。近日,华为Mate 70系列手机的发布会正式定档在11月26日,这一消息引发了众多科技爱…...

进程信号

目录 信号入门 1. 生活角度的信号 2. 技术应用角度的信号 3. 注意 4. 信号概念 5. 用kill -l命令可以察看系统定义的信号列表 6. 信号处理常见方式概览 产生信号 1. 通过终端按键产生信号 Core Dump 2. 调用系统函数向进程发信号 3. 由软件条件产生信号 4. 硬件异…...

RT-DETR融合GhostModel V3及相关改进思路

RT-DETR使用教程: RT-DETR使用教程 RT-DETR改进汇总贴:RT-DETR更新汇总贴 《GhostNetV3: Exploring the Training Strategies for Compact Models》 一、 模块介绍 论文链接:https://arxiv.org/pdf/2404.11202v1 代码链接:https:…...

JVM有哪些垃圾回收器

Serial垃圾回收器:单线程收集器,适用于客户端模式下的小型应用。 使用复制算法回收新生代,使用标记-整理算法回收老年代。 在进行垃圾回收时,会停止所有用户线程(Stop-The-World, STW)。Serial Old垃圾回收…...

EWM 打印

目录 1 简介 2 后台配置 3 主数据 4 业务操作 1 简介 打印即输出管理(output management)利用“条件表”那一套理论实现。而当打印跟 EWM 集成到一起时,也需要利用 PPF(Post Processing Framework)那一套理论。而…...

前端文件优化

一、图片优化 计算图片大小 对于一张100*100像素的图片来说,图像上有 10000 个像素点,如果每个像素的值是 RGBA 存储的话,那么也就是说每个像素有 4 个通道,每个通道 1 个字节(8 位 1个字 节)&#xff0…...

电脑怎么自动切换IP地址

在现代网络环境中,电脑自动切换IP地址的需求日益增多。无论是出于网络安全、隐私保护,还是为了绕过地域限制,自动切换IP地址都成为了许多用户关注的焦点。本文将详细介绍几种实现电脑自动切换IP地址的方法,以满足不同用户的需求。…...

hbase集成phoenix

1.环境 环境准备 三台节点zookeeper三节点hadoop三节点hbase三节点 2.pheonix集成 官网下载地址,需挂梯子,使用官网推荐的对应hbase版本即可 https://phoenix.apache.org/download.html下载及解压 wget https://dlcdn.apache.org/phoenix/phoenix-…...

单片机智能家居火灾环境安全检测

目录 前言 一、本设计主要实现哪些很“开门”功能? 二、电路设计原理图 电路图采用Altium Designer进行设计: 三、实物设计图 四、程序源代码设计 五、获取资料内容 前言 在现代社会,火灾安全始终是人们关注的重点问题。随着科技的不…...

Git_2024/11/16

文章目录 前言Git是什么核心概念工作流程常见术语解读Git的优势 Git与SVN对比SVNGit总结 Git配置流程及指令环境配置获取Git仓库本地初始化远程克隆 工作目录、暂存区、版本库文件的两种状态本地仓库操作远程仓库操作Git分支Git标签IntelliJ IDEA使用Git回滚代码 GitHub配置流程…...

信号处理学习笔记5:卡尔曼滤波理论

卡尔曼滤波,用直白的话来讲, 就是有多个不确定的结果,经过分析、推理和计算,获得相对准确的结果。 它的核心特点是: 能够预测数据的未来趋势\({x}_{k}^{ }\) 结合当前数据进行修正,使预测更加准确 可以处理…...

华中科大大突破:让AI拥有“空间感“,从此告别“方向感缺失症“

这项由华中科技大学和百度公司联合开展的研究发表于2026年3月,论文编号为arXiv:2603.19235v1,研究团队提出了一个名为VEGA-3D(VideoExtracted Generative Awareness)的创新框架。有兴趣深入了解的读者可以通过该论文编号查询完整论…...

从OpenJDK到GraalVM:JDK21安装后,你还可以试试这些高性能Java运行时

从OpenJDK到GraalVM:JDK21安装后,你还可以试试这些高性能Java运行时 当你完成JDK21的基础安装后,Java生态的探索才刚刚开始。现代Java开发早已不再局限于传统JVM,越来越多的创新运行时正在重塑性能边界。本文将带你深入GraalVM、L…...

气象防灾实战:如何用QGIS快速生成暴雨等值面预警图?(含历史数据对比)

气象防灾实战:如何用QGIS快速生成暴雨等值面预警图?(含历史数据对比) 暴雨灾害的预警与防控一直是应急管理和市政规划领域的核心挑战。传统的气象数据分析往往依赖专业软件和复杂代码,让非技术背景的从业者望而却步。本…...

SAP EWM RF手持设备开发实战:从SPRO配置到屏幕绘制的完整流程

SAP EWM RF手持设备开发实战:从SPRO配置到屏幕绘制的完整流程 在仓储物流领域,SAP EWM(Extended Warehouse Management)系统的RF(Radio Frequency)手持设备开发一直是技术难点与业务痛点的交汇处。不同于传…...

动态规划 -- 最长公共子序列

最长公共子序列的结构设序列 X{x1,x2,…,x m} 和 Y{y1,y2,…,y n} 的最长公共子序列为 Z{z1,z2,…,z k},则有以下结论:若 x my n,则 z kx my n,且 Z k−1(即 Z 去掉最后一个元素 z k 后的子序列)是 X m−1&…...

Video-LLaMA部署指南:如何在本地服务器上高效运行多模态AI

Video-LLaMA部署指南:如何在本地服务器上高效运行多模态AI 【免费下载链接】Video-LLaMA [EMNLP 2023 Demo] Video-LLaMA: An Instruction-tuned Audio-Visual Language Model for Video Understanding 项目地址: https://gitcode.com/gh_mirrors/vi/Video-LLaMA …...

【自然语言处理】BERTopic:解决文本主题分析的5个创新方案

#【自然语言处理】BERTopic:解决文本主题分析的5个创新方案 【免费下载链接】BERTopic Leveraging BERT and c-TF-IDF to create easily interpretable topics. 项目地址: https://gitcode.com/gh_mirrors/be/BERTopic 在信息爆炸的时代,如何从海…...

从MobileNet到FasterNet:一个ARM安卓开发者的轻量级模型选型与部署实战笔记

从MobileNet到FasterNet:ARM安卓开发者的轻量级模型选型与部署实战 在移动端AI应用开发中,模型选型往往是一场精度与速度的博弈。作为一名长期奋战在ARM平台部署一线的工程师,我经历过太多次这样的场景:产品经理要求"既要实时…...

数学学习者的终极指南:如何高效利用开源资源库构建完整知识体系

数学学习者的终极指南:如何高效利用开源资源库构建完整知识体系 【免费下载链接】awesome-math A curated list of awesome mathematics resources 项目地址: https://gitcode.com/GitHub_Trending/aw/awesome-math 在数字化学习时代,如何从海量的…...