python如何使用Rabbitmq
目录
一、Rabbitmq介绍
二、Rabbitmq的使用场景
1、异步处理
2、服务解耦
3、流量削峰
4、日志收集
5、发布订阅
6、任务调度
三、python如何使用Rabbitmq
1、安装依赖
2、基础使用
3、消息确认
4、消息持久化
5、公平调度
6、发布订阅
7、关键字发布
一、Rabbitmq介绍
RabbitMQ是一个开源的消息中间件,基于AMQP(Advanced Message Queue Protocol,高级消息队列协议)协议实现。RabbitMQ被广泛应用于各种应用场景,如异步任务处理、日志传输、实时消息推送等。在微服务架构中,RabbitMQ是一个常见的消息中间件选择,它可以帮助服务之间实现解耦和异步通信,提高系统的可扩展性和稳定性。RabbitMQ提供了一个简单的用户页面,用户可以监控和管理消息、队列、交换器、绑定等资源。通过管理界面,用户可以直观地了解系统的运行状态,并进行相应的配置和管理操作。
二、Rabbitmq的使用场景
1、异步处理
- 在Web应用中,当用户提交表单时,可以将表单处理任务发送给RabbitMQ,由后台服务异步处理,从而提高用户界面的响应速度。
- 在电商系统中,用户下单后,订单处理、库存更新、支付通知等任务可以异步执行,避免阻塞主线程。
2、服务解耦
- 在微服务架构中,不同服务之间通过RabbitMQ进行通信,可以降低服务之间的耦合度,提高系统的可扩展性和可维护性。
- 当某个服务需要升级或维护时,可以通过RabbitMQ实现服务的平滑过渡,而不会影响其他服务的正常运行。
3、流量削峰
- 在高并发场景中,RabbitMQ可以作为一个缓冲层,接收并存储大量的请求,然后按照设定的速率将请求转发给后端服务,从而避免后端服务因过载而崩溃。
- 通过RabbitMQ的限流和队列机制,可以有效地控制请求的速率和数量,保护后端服务的稳定性。
4、日志收集
- RabbitMQ可以用于收集分散在各个服务器上的日志信息,将它们集中到一个或多个日志处理服务中,进行统一的分析和处理。
- 通过RabbitMQ,可以实现日志的实时收集、分析和报警,提高系统的运维效率和故障排查能力。
5、发布订阅
- 在需要向多个客户端推送消息的场景中,如实时通知、消息推送等,可以使用RabbitMQ的Fanout交换器将消息广播给所有绑定的队列。
- 通过RabbitMQ的消息广播机制,可以实现实时、可靠的消息推送服务,提高用户体验。
6、任务调度
- RabbitMQ可以与其他任务调度框架(如Quartz)结合使用,实现定时任务、延迟任务等复杂任务调度需求。
- 通过RabbitMQ的任务调度功能,可以灵活地控制任务的执行时间和频率,提高系统的自动化程度和运行效率。
三、python如何使用Rabbitmq
Rabbitmq网址:https://www.rabbitmq.com/tutorials
1、安装依赖
安装第三方库pika:pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pika
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pika
2、基础使用
生产者模型代码如下
import pika# 生产者模型connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) # 连接本地的rabbitmq
channel = connection.channel() # 连接通道
channel.queue_declare(queue='list') # 创建一个名叫list的队列
channel.basic_publish(exchange='',routing_key='list', # 向那个队列发布信息body='lol',# 发布的信息)
connection.close() # 关闭连接
消费者代码如下
import pika# 消费者模型connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) # 连接本地的rabbitmq
channel = connection.channel() # 连接通道
channel.queue_declare(queue='list') # 为确保队列存在,创建一个list队列def callback(ch, method, properties, body):print(f" [x] 消费者接收到了任务 {body}")channel.basic_consume(queue='list',on_message_callback=callback,auto_ack=True) # 接收信息:queue表示监听的队列,on_message_callback表示接收到信息执行的函数,auto_ack表示默认执行回复
channel.start_consuming() # 开启永无止境的循环监听该队列
3、消息确认
在队列中,执行任务可能需要几秒钟,您可能想知道如果 使用者启动一个长任务,并在完成之前终止。 使用我们当前的代码,一旦 RabbitMQ 将消息传递给消费者,它就会 立即将其标记为删除。在这种情况下,如果您终止 worker,则它刚刚处理的消息丢失了。调度的消息 对于这个尚未处理的特定 worker 来说,也会丢失。为了确保消息永远不会丢失,RabbitMQ 支持消息确认。ack由 consumer 告诉 RabbitMQ 已收到特定消息, 处理,并且 RabbitMQ 可以自由删除它。
这是由消费者来进行改变的,代码如下:
需要将auto_ack改为False,然后在回调函数里加入ch.basic_ack(delivery_tag=method.delivery_tag)
import pika# 消费者模型connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) # 连接本地的rabbitmq
channel = connection.channel() # 连接通道
channel.queue_declare(queue='list') # 创建一个队列def callback(ch, method, properties, body):print(f" [x] 消费者接收到了任务 {body}")ch.basic_ack(delivery_tag=method.delivery_tag) # 给队列回复进行确认channel.basic_consume(queue='list2',on_message_callback=callback,auto_ack=False) # auto_ack改为False
channel.start_consuming() # 开启永无止境的循环监听该队列
4、消息持久化
如果 RabbitMQ 服务器 停止。
当 RabbitMQ 退出或崩溃时,它会忘记队列和消息 除非你告诉它不要这样做。需要做两件事来确保 消息不会丢失:我们需要将队列和消息都标记为 耐用。然和把队列和消息保存在磁盘里。
这是有生产者来进行改变的,代码如下:
durable=True在创建队列时声明持久化,delivery_mode=pika.DeliveryMode.Persistent让信息做持久化
import pika# 生产者模型connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) # 连接本地的rabbitmq
channel = connection.channel() # 连接通道
channel.queue_declare(queue='list2',durable=True) # 创建一个队列,durable=True表示队列支持持久化
channel.basic_publish(exchange='',routing_key='list2', # 向那个队列发布信息body='lol',# 发布的信息properties=pika.BasicProperties(delivery_mode=pika.DeliveryMode.Persistent # 让信息做持久化))
connection.close() # 关闭连接
5、公平调度
您可能已经注意到,调度仍然没有完全工作 正如我们想要的那样。例如,在有两个 worker 的情况下,当所有 奇数消息很重,偶数消息很轻,一个 worker 将是 一直很忙,另一个几乎不做任何工作。井 RabbitMQ 对此一无所知,仍会 dispatch 消息均匀。
发生这种情况是因为 RabbitMQ 只是在消息 进入队列。它不看未确认的数量 消息。它只是盲目地调度每 n 条消息 到第 n 个消费者。
为了解决这个问题,我们可以使用带有 setup 的 channel 方法。它使用协议方法告诉 RabbitMQ 不要一次向 worker 提供多条消息。或者,换句话说,不要调度 向 worker 发送新消息,直到它处理并确认 上一个。相反,它会将其分派给下一个仍然不忙的 worker。
这是由消费者来进行改变的,代码如下:
channel.basic_qos(prefetch_count=1)进行闲置派发
import pika# 消费者模型connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) # 连接本地的rabbitmq
channel = connection.channel() # 连接通道
channel.queue_declare(queue='list') # 为确保队列存在,创建一个list队列def callback(ch, method, properties, body):print(f" [x] 消费者接收到了任务 {body}")channel.basic_qos(prefetch_count=1) # 闲置派发
channel.basic_consume(queue='list',on_message_callback=callback,auto_ack=True) # 接收信息:queue表示监听的队列,on_message_callback表示接收到信息执行的函数,auto_ack表示默认执行回复
channel.start_consuming() # 开启永无止境的循环监听该队列
6、发布订阅
发布订阅模式是将信息发布给所有的订阅者,其特点就是有交换机。
channel.exchange_declare(exchange='m1',exchange_type='fanout')声明一个交换机,类型为fanout,是将信息发给所有的订阅者
发布者代码如下:
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) # 连接rabbitmq
channel = connection.channel() # 连接通道channel.exchange_declare(exchange='m1',exchange_type='fanout') # fanout:将信息发给所有的队列
channel.queue_declare(queue='') # 创建一个队列,durable=True表示队列支持持久化
channel.basic_publish(exchange='m1',routing_key='', # 队列名称body='lol',)# 发布的数据
connection.close() # 关闭连接
订阅者代码如下:
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) # 连接rabbitmq
channel = connection.channel()channel.exchange_declare(exchange='m1',exchange_type='fanout',) # fanout:将信息发给所有的队列
result=channel.queue_declare(exclusive=True,queue='') # 创建一个随机队列
queue_name=result.method.queue # 拿到队列名字
print(queue_name)
channel.queue_bind(exchange='m1',queue=queue_name) # 对exchange和queue进行绑定def callback(ch, method, properties, body):print(f" [x] 消费者接收到了任务 {body}")channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)
channel.start_consuming() # 开启永无止境的循环监听该队列
7、关键字发布
关键字发布就是在发布订阅模式基础上,将不同信息发布给不同的订阅者。
channel.exchange_declare(exchange='m2',exchange_type='direct') ,声明一个交换机,交换机的类型为direct,将信息发布给指定的订阅者。
发布者代码如下:
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) # 连接rabbitmq
channel = connection.channel() # 连接通道channel.exchange_declare(exchange='m2',exchange_type='direct') # direct:将信息发布给指定的订阅者
channel.queue_declare(queue='') # 创建一个队列,durable=True表示队列支持持久化
channel.basic_publish(exchange='m2',routing_key='hhq', # 队列名称body='cf',)# 发布的数据
connection.close() # 关闭连接
订阅者代码如下:
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) # 连接rabbitmq
channel = connection.channel()channel.exchange_declare(exchange='m2',exchange_type='direct') # exchange:交易所的名称,fanout:将信息发给所有的队列
result=channel.queue_declare(exclusive=True,queue='') # 创建一个随机队列
queue_name=result.method.queue # 拿到队列名字
print(queue_name)
channel.queue_bind(exchange='m2',queue=queue_name,routing_key='hhq') # 对exchange和queue进行绑定def callback(ch, method, properties, body):print(f" [x] 消费者接收到了任务 {body}")channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)
channel.start_consuming() # 开启永无止境的循环监听该队列
相关文章:
python如何使用Rabbitmq
目录 一、Rabbitmq介绍 二、Rabbitmq的使用场景 1、异步处理 2、服务解耦 3、流量削峰 4、日志收集 5、发布订阅 6、任务调度 三、python如何使用Rabbitmq 1、安装依赖 2、基础使用 3、消息确认 4、消息持久化 5、公平调度 6、发布订阅 7、关键字发布 一、Rabbi…...
分布式,微服务,SpringCloudAlibaba,nacos,gateway,openFeign
想学习微服务SpringCloudAlibaba的小伙伴,可以观看视频 地址: https://www.bilibili.com/video/BV1cFDEYWEkY/?vd_source14d27ec13a4737c281b7c79463687112分布式架构和微服务是两个密切相关但又有所区别的概念。它们在现代软件工程中经常被提及&#…...
MySQL初学之旅(3)约束
目录 1.前言 2.正文 2.1约束类型 2.2NULL约束 2.3UNIQUE约束 2.4DEFAULT约束 2.5PRIMARY KEY主键约束 2.6FOREIGN KEY外键约束 2.7CHECK约束 3.小结 1.前言 哈喽大家好啊,今儿来继续给大家分享最近学习的MySQL和约束相关的知识点,希望大家一起…...
使用YOLOv9进行图像与视频检测
大家好,YOLOv9 与其前身v8一样,专注于识别和精确定位图像和视频中的对象。本文将介绍如何使用YOLOv9进行图像与视频检测,自动驾驶汽车、安全系统和高级图像搜索等应用在很大程度上依赖于此功能,YOLOv9 引入了比 YOLOv8 更令人印象…...
C# 中的 LINQ:轻松处理集合和数据
C#中的LINQ(Language Integrated Query),这是一个非常强大且实用的功能,可以简化集合操作和数据查询。以下是一篇关于C#中LINQ使用的文章。 引言 LINQ(Language Integrated Query)是C#语言的一个重要特性…...
【征稿倒计时!华南理工大学主办 | IEEE出版 | EI检索稳定】2024智能机器人与自动控制国际学术会议 (IRAC 2024)
#华南理工大学主办!#IEEE出版!EI稳定检索!#组委阵容强大!IEEE Fellow、国家杰青等学术大咖领衔出席!#会议设置“优秀论文”“优秀青年学者报告”“优秀海报”等评优奖项 2024智能机器人与自动控制国际学术会议 &#…...
RHCE的学习(20)
变量5种赋值方式 shell中变量赋值5种方式,其中采用name10的方法称A 直接赋值 nameB read命令 read v1C 使用命令行参数 ($1 $2 $3 ..) name$1D 使用命令的输入 username$(whoami)E 从文件读取 #cut -d : -f1 /etc/passwd > /user.listfor…...
控制器ThinkPHP6
五、控制器中对数组值的返回 在做接口服务时,很多时候回使用数组作为返回值,那么数组如何返回成 json呢? 在 tp6 中返回json 很简单,直接使用 json 进行返回即可,例如: public function index(){$resarra…...
1. Django中的URL调度器 (项目创建与简单测试)
1. 创建 Django 项目 运行以下命令创建一个名为 blog_project 的 Django 项目: django-admin startproject blog_project2. 创建博客应用 Django 中,项目可以包含多个应用。创建一个名为 blog 的应用: cd blog_project python manage.py …...
学习python的第十天之数据类型——dict字典
学习python的第十天之数据类型——dict字典 Python 中的字典(Dictionary)是一个非常强大的内置数据类型,它用来存储键值对(key-value pairs)信息。字典是无序的,这意味着它们不会记录你添加键值对的顺序&am…...
华为Mate 70临近上市:代理IP与抢购攻略
随着科技的飞速发展,智能手机已经成为我们日常生活中不可或缺的一部分。而在众多智能手机品牌中,华为一直以其卓越的技术和创新力引领着行业的发展。近日,华为Mate 70系列手机的发布会正式定档在11月26日,这一消息引发了众多科技爱…...
进程信号
目录 信号入门 1. 生活角度的信号 2. 技术应用角度的信号 3. 注意 4. 信号概念 5. 用kill -l命令可以察看系统定义的信号列表 6. 信号处理常见方式概览 产生信号 1. 通过终端按键产生信号 Core Dump 2. 调用系统函数向进程发信号 3. 由软件条件产生信号 4. 硬件异…...
RT-DETR融合GhostModel V3及相关改进思路
RT-DETR使用教程: RT-DETR使用教程 RT-DETR改进汇总贴:RT-DETR更新汇总贴 《GhostNetV3: Exploring the Training Strategies for Compact Models》 一、 模块介绍 论文链接:https://arxiv.org/pdf/2404.11202v1 代码链接:https:…...
JVM有哪些垃圾回收器
Serial垃圾回收器:单线程收集器,适用于客户端模式下的小型应用。 使用复制算法回收新生代,使用标记-整理算法回收老年代。 在进行垃圾回收时,会停止所有用户线程(Stop-The-World, STW)。Serial Old垃圾回收…...
EWM 打印
目录 1 简介 2 后台配置 3 主数据 4 业务操作 1 简介 打印即输出管理(output management)利用“条件表”那一套理论实现。而当打印跟 EWM 集成到一起时,也需要利用 PPF(Post Processing Framework)那一套理论。而…...
前端文件优化
一、图片优化 计算图片大小 对于一张100*100像素的图片来说,图像上有 10000 个像素点,如果每个像素的值是 RGBA 存储的话,那么也就是说每个像素有 4 个通道,每个通道 1 个字节(8 位 1个字 节)࿰…...
电脑怎么自动切换IP地址
在现代网络环境中,电脑自动切换IP地址的需求日益增多。无论是出于网络安全、隐私保护,还是为了绕过地域限制,自动切换IP地址都成为了许多用户关注的焦点。本文将详细介绍几种实现电脑自动切换IP地址的方法,以满足不同用户的需求。…...
hbase集成phoenix
1.环境 环境准备 三台节点zookeeper三节点hadoop三节点hbase三节点 2.pheonix集成 官网下载地址,需挂梯子,使用官网推荐的对应hbase版本即可 https://phoenix.apache.org/download.html下载及解压 wget https://dlcdn.apache.org/phoenix/phoenix-…...
单片机智能家居火灾环境安全检测
目录 前言 一、本设计主要实现哪些很“开门”功能? 二、电路设计原理图 电路图采用Altium Designer进行设计: 三、实物设计图 四、程序源代码设计 五、获取资料内容 前言 在现代社会,火灾安全始终是人们关注的重点问题。随着科技的不…...
Git_2024/11/16
文章目录 前言Git是什么核心概念工作流程常见术语解读Git的优势 Git与SVN对比SVNGit总结 Git配置流程及指令环境配置获取Git仓库本地初始化远程克隆 工作目录、暂存区、版本库文件的两种状态本地仓库操作远程仓库操作Git分支Git标签IntelliJ IDEA使用Git回滚代码 GitHub配置流程…...
高危文件识别的常用算法:原理、应用与企业场景
高危文件识别的常用算法:原理、应用与企业场景 高危文件识别旨在检测可能导致安全威胁的文件,如包含恶意代码、敏感数据或欺诈内容的文档,在企业协同办公环境中(如Teams、Google Workspace)尤为重要。结合大模型技术&…...
论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(一)
宇树机器人多姿态起立控制强化学习框架论文解析 论文解读:交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(一) 论文解读:交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化…...
leetcodeSQL解题:3564. 季节性销售分析
leetcodeSQL解题:3564. 季节性销售分析 题目: 表:sales ---------------------- | Column Name | Type | ---------------------- | sale_id | int | | product_id | int | | sale_date | date | | quantity | int | | price | decimal | -…...
零基础设计模式——行为型模式 - 责任链模式
第四部分:行为型模式 - 责任链模式 (Chain of Responsibility Pattern) 欢迎来到行为型模式的学习!行为型模式关注对象之间的职责分配、算法封装和对象间的交互。我们将学习的第一个行为型模式是责任链模式。 核心思想:使多个对象都有机会处…...
OpenLayers 分屏对比(地图联动)
注:当前使用的是 ol 5.3.0 版本,天地图使用的key请到天地图官网申请,并替换为自己的key 地图分屏对比在WebGIS开发中是很常见的功能,和卷帘图层不一样的是,分屏对比是在各个地图中添加相同或者不同的图层进行对比查看。…...
JavaScript基础-API 和 Web API
在学习JavaScript的过程中,理解API(应用程序接口)和Web API的概念及其应用是非常重要的。这些工具极大地扩展了JavaScript的功能,使得开发者能够创建出功能丰富、交互性强的Web应用程序。本文将深入探讨JavaScript中的API与Web AP…...
【Android】Android 开发 ADB 常用指令
查看当前连接的设备 adb devices 连接设备 adb connect 设备IP 断开已连接的设备 adb disconnect 设备IP 安装应用 adb install 安装包的路径 卸载应用 adb uninstall 应用包名 查看已安装的应用包名 adb shell pm list packages 查看已安装的第三方应用包名 adb shell pm list…...
Python 高效图像帧提取与视频编码:实战指南
Python 高效图像帧提取与视频编码:实战指南 在音视频处理领域,图像帧提取与视频编码是基础但极具挑战性的任务。Python 结合强大的第三方库(如 OpenCV、FFmpeg、PyAV),可以高效处理视频流,实现快速帧提取、压缩编码等关键功能。本文将深入介绍如何优化这些流程,提高处理…...
在鸿蒙HarmonyOS 5中使用DevEco Studio实现指南针功能
指南针功能是许多位置服务应用的基础功能之一。下面我将详细介绍如何在HarmonyOS 5中使用DevEco Studio实现指南针功能。 1. 开发环境准备 确保已安装DevEco Studio 3.1或更高版本确保项目使用的是HarmonyOS 5.0 SDK在项目的module.json5中配置必要的权限 2. 权限配置 在mo…...
【若依】框架项目部署笔记
参考【SpringBoot】【Vue】项目部署_no main manifest attribute, in springboot-0.0.1-sn-CSDN博客 多一个redis安装 准备工作: 压缩包下载:http://download.redis.io/releases 1. 上传压缩包,并进入压缩包所在目录,解压到目标…...
