flask整合rabbitMQ插件的方式
文章目录
- 二、Python-flask-rabbitMQ-插件方式整合
- 引言
- 具体步骤
- 1 安装依赖:
- 2 编写实体类:
- 3 编写消费者和生产者:
- 4 初始化消费者和生产者:
- 5 其他地方使用生产者
二、Python-flask-rabbitMQ-插件方式整合
引言
当今互联网应用的高并发场景下,如何保证可靠的消息传递和高效的处理成为了一项重要的挑战。在这种情况下,RabbitMQ作为一种可靠的消息队列系统,被广泛应用于各个领域。
本文将介绍如何使用RabbitMQ插件整合Flask框架,实现并发性生产者和消费者的兼容。通过这种方式,我们可以利用RabbitMQ的优势来确保消息的可靠传递,并提高应用程序的处理能力。
首先,我们需要在Flask应用程序中引入RabbitMQ插件。可以使用Pika库来连接和操作RabbitMQ。通过创建一个连接池管理连接对象,我们可以避免频繁地创建和销毁连接,提高效率。
同时,为了处理高并发的生产者,我们可以使用批量发送消息的方式来减少通道创建和消息发布的开销。通过设置缓冲区来收集一定数量或一定时间间隔内的消息,然后批量发送,可以更有效地利用资源。
此外,对于高并发的消费者,我们可以考虑使用异步的方式来处理消息。通过将消息发送任务交给后台线程或异步任务队列处理,可以避免请求的堵塞,提高应用程序的并发能力。
通过以上的优化方案,我们可以在Flask应用程序中充分利用RabbitMQ的功能,并且兼容高并发的生产者和消费者。这将帮助我们构建更可靠、高效的消息队列系统,应对日益增长的并发访问需求。
总之,使用RabbitMQ插件整合Flask框架,并采用优化方案来兼容并发性生产者和消费者,是构建可靠、高效消息传递系统的关键一步。通过这种方式,我们能够更好地应对高并发场景下的挑战,提升应用程序的性能和稳定性。
具体步骤
1 安装依赖:
使用pip安装pika库:
pip install pika
2 编写实体类:
from queue import Queue
from threading import Lockimport pika# 定义交换机类型的枚举值
class ExchangeType:DEFAULT = 'default'DIRECT = "direct"FANOUT = "fanout"TOPIC = 'topic'class RabbitMQ:def __init__(self, host='localhost', port=5672, username='guest', password='guest', pool_size=10):self.credentials = pika.PlainCredentials(username, password)self.parameters = pika.ConnectionParameters(host=host, port=port, credentials=self.credentials)self.connection_pool = Queue(pool_size) # 连接池,存储连接和信道self.lock = Lock() # 互斥锁,用于对连接池的访问进行同步for _ in range(pool_size):connection = self._create_connection()channel = connection.channel()self.connection_pool.put((connection, channel))def _create_connection(self):return pika.BlockingConnection(self.parameters)def get_channel(self):with self.lock:connection, channel = self.connection_pool.get() # 从连接池获取连接和信道return connection, channeldef release_channel(self, connection, channel):with self.lock:self.connection_pool.put((connection, channel)) # 将连接和信道放回连接池def send_message(self, exchange, routing_key, message, exchange_type=ExchangeType.DEFAULT):connection, channel = self.get_channel()try:channel.exchange_declare(exchange=exchange, exchange_type=exchange_type) # 声明交换机并指定类型channel.basic_publish(exchange=exchange, routing_key=routing_key, body=message) # 发布消息finally:self.release_channel(connection, channel)def receive_messages(self, queue, callback):connection, channel = self.get_channel()try:channel.queue_declare(queue=queue, durable=True) # 声明队列并标记为持久化# channel.queue_purge(queue=queue) # 清空队列,以防之前的非持久化消息残留channel.basic_qos(prefetch_count=10) # 每次从 RabbitMQ 获取 10 条消息channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=False) # 消费消息并设置回调函数channel.start_consuming() # 开始消费消息finally:self.release_channel(connection, channel)
3 编写消费者和生产者:
def connect_mq(app):# 初始化 RabbitMQ 实例rabbitmq = RabbitMQ(host=app.config['RABBITMQ_HOST'], port=5672, username='guest', password='guest')# 在应用上下文中注册 RabbitMQ 实例app.config['RABBITMQ'] = rabbitmq# consume_mq(app)#thread = threading.Thread(target=consume_mq, args=(app,))# 启动线程thread.start()def consume_mq(app):# 启动消费者程序,开始接收和处理消息def callback(ch, method, properties, body):try:print(f"消息队列内容 {body.decode()}")# 处理rabbitMQ内容to_transcribe(body.decode())except Exception as e:print(str(e))ch.basic_ack(delivery_tag=method.delivery_tag)# 启动消费者程序,开始接收和处理消息with app.app_context():rabbitmq = current_app.config['RABBITMQ']rabbitmq.receive_messages('audio_queue', callback)
4 初始化消费者和生产者:
def create_app():app = Flask(__name__)connect_mq_v1(app)
5 其他地方使用生产者
class MessageHandler:"""处理存放音频,将所有的任务都放在MQ里面"""def __init__(self, dir_name,uuid_str, back_url, file_url, request_type, file_name, *args, **kwargs):# 文件夹名称self.dir_name = dir_name# 文件名称self.file_name = file_name# 文件上传类型self.request_type = request_type# 文件存储位置self.file_url = file_url# 客户端回调地址self.back_url = back_url# 唯一标识self.uuid_str = uuid_strdef send(self):""":param content_type:队列类型:param rpc:MQ对象:return:"""try:# 发送消息队列# rpc.send_expire(body=json.dumps(self.to_json()), exchange='audio_queue', key='audio_queue')rabbitmq = current_app.config['RABBITMQ']rabbitmq.send_message('audio_queue', 'audio_queue', json.dumps(self.to_json()))print("发送消息到mq成功,用于存放音频信息")except Exception as e:print(f"发送消息到mq服务失败,请检查, {e}")def to_json(self):_dict = self.__dict__return _dict
# 将请求体和uuid放到rabbitMQ中
MessageHandler(**dates).send()
相关文章:
flask整合rabbitMQ插件的方式
文章目录 二、Python-flask-rabbitMQ-插件方式整合引言具体步骤1 安装依赖:2 编写实体类:3 编写消费者和生产者:4 初始化消费者和生产者:5 其他地方使用生产者 二、Python-flask-rabbitMQ-插件方式整合 引言 当今互联网应用的高…...
【React】高频面试题
1. 简述下 React 的事件代理机制? React使用了一种称为“事件代理”(Event Delegation)的机制来处理事件。事件代理是指将事件处理程序绑定到组件的父级元素上,然后在需要处理事件的子元素上触发事件时,事件将被委托给…...
Java数据结构之稀疏数组
目录 线性结构与非线性结构线性结构非线性结构 稀疏数组应用场景 代码实现二维数组转稀疏数组稀疏数组转二维数组 线性结构与非线性结构 线性结构 数据结构分两种,线性与非线性,线性结构的数据元素之间存在一对一的关系。 一对一指的是每个数据元素都…...
迅为RK3568开发板RTMP推流之视频监控
1 搭建 RTMP 媒流体服务器 nginx-rtmp 是一个基于 nginx 的 RTMP 服务模块,是一个功能强大的流媒体服务器模块, 它提供了丰富的功能和灵活的配置选项,适用于构建各种规模的流媒体平台和应用。无论是搭建实时视频直播平台、点播系统或多屏互…...
利用CSRF或XSS攻击网站的例子
利用 CSRF 攻击网站的简单示例: 假设有一个在线银行应用,用户可以在其中执行转账操作。用户登录后,系统会生成一个包含转账信息的表单,用户需要填写表单来发起转账。这个表单如下所示: <form action"https:/…...
LeetCode讲解篇之113. 路径总和 II
文章目录 题目描述题解思路题解代码 题目描述 题解思路 深度优先遍历二叉树,遍历的同时记录路径,直到遍历到叶节点,若路径和为targetSum则添加到结果集中 题解代码 func pathSum(root *TreeNode, targetSum int) [][]int {var res make([…...
中国HR从业者现状是怎样的?应如何提升自己?
HR(Human Resource)解释为人力资源,现在统称为人力资源顾问,跟传统人事有本质区别。传统人事一般是和行政部做相类似的工作,比如招聘,培训,职员的考核,职员的薪酬,职员调动等。现代人力资源&…...
Promise笔记-同步回调-异步回调-JS中的异常error处理-Promis的理解和使用-基本使用-链式调用-七个关键问题
Promise笔记 1. 预备知识1.1 实例对象与函数对象1.2 两种类型的回调函数1. 同步回调2. 异步回调 1.3 JS中的异常error处理1. 错误的类型2. 错误处理(捕获与抛出)3. 错误对象 2.Promise的理解和使用2.1 Promise是什么1.理解Promise2.Promise 的状态3. Pro…...
计算机考研自命题(2)
1、C语言-字符串交替拼接 1、用C编程,将两个字符串数组存储实现交替连接如aaa和bbb两个字符连接成ababab 如aaa和baba 两个字符,连接成 abaaaba #include<stdio.h>/* 解题思路:将两个字符串交替拼接,定义三个数组࿰…...
ZKP6.1 Discrete-log-based Polynomial Commitments (Preliminary)
ZKP学习笔记 ZK-Learning MOOC课程笔记 Lecture 6: Discrete-log-based Polynomial Commitments (Yupeng Zhang) Recall How to build an efficient SNARK? A polynomial commitment scheme A polynomial interactive oracle proof (IOP) SNARK for general circuits Plo…...
五金经营小程序商城的作用体现在哪
对消费者而言,如今线上购买五金是很多人的选择,传统线下购买,不仅需要跑路,而且店内未必有所需品,但线上平台则一目了然购买所需品,本地/外地均可以触达到,同时还可对用户/会员进行高效管理&…...
今年这行情,不会自动化的要做好心理准备了
李强是一名软件测试工程师,入行之后在一家小型公司工作了五年。这段时间里,他主要负责手工测试和一些简单的自动化测试工作。由于公司项目也相对简单,他逐渐陷入了工作的舒适区,没有积极追求新的知识和技能。 然而随着身边朋友发展…...
汽车保养笔记
汽车保养笔记 汽车小保养汽车大保养五油:机油变速箱油刹车油转向助力油离合器油 四滤:机油滤芯更换空气滤芯更换空调滤芯更换汽油滤芯更换 三水防冻液(水)玻璃水电瓶水 其他刹车片球头减震器火花塞 4S店的4大套路---没必要清洗节气门更换火花塞和高压线圈…...
【斗破年番】官方改编用心了,彩鳞怀孕并未删,萧潇肯定登场,真相在丹药身上
【侵权联系删除】 【文/郑尔巴金】 斗破苍穹年番动画已经更新了,相信不少人都感觉到不可思议,萧炎跟随美杜莎女王回蛇人族的剧情,居然魔改成这样。好好的腹中孕育出新生命,变成了陨落心炎残余能量,不及时处理的话&…...
英语——分享篇——每日200词——3201-3400
3201——air-conditioning——[eərkəndɪʃnɪŋ]——n.空调设备;vt.给…装上空调——air-conditioning——air-condition空调(熟词)ing鹰(谐音)——空调设备的噪音让鹰不得安宁——The trains dont even have proper air-conditioning, grumbles Mr So. ——地铁…...
合并区间(C++解法)
题目 以数组 intervals 表示若干个区间的集合,其中单个区间为 intervals[i] [starti, endi] 。请你合并所有重叠的区间,并返回 一个不重叠的区间数组,该数组需恰好覆盖输入中的所有区间 。 示例 1: 输入:intervals …...
CUDA学习笔记(十四) Constant Memory
转载至https://www.cnblogs.com/1024incn/tag/CUDA/ CONSTANT MEMORY constant Memory对于device来说只读但是对于host是可读可写。constant Memory和global Memory一样都位于DRAM,并且有一个独立的on-chip cache,比直接从constant Memory读取要快得多…...
使用MFC创建一个SaleSystem
目录 1、项目的创建: 2、项目的配置: 3、设置窗口属性: (1)、设置图标 1)、添加导入资源 2)、代码初始化图标 (2)、设置标题 (3)、设置窗口…...
grafana v10.1版本设置告警
1. 相关概念概述 如图所示,点击切换菜单标志,可以看到警报相关子选项。 警报规则:通过PromQL语句定义告警规则,即达到怎样的状态触发告警。 联络点: 设置当警报规则实例触发时,如何通知联系人,…...
Python+Requests+PyTest+Excel+Allure 接口自动化测试实战
本文主要介绍了PythonRequestsPyTestExcelAllure 接口自动化测试实战,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧 Unittest是Python标准库中自带的单元测试框架…...
无法与IP建立连接,未能下载VSCode服务器
如题,在远程连接服务器的时候突然遇到了这个提示。 查阅了一圈,发现是VSCode版本自动更新惹的祸!!! 在VSCode的帮助->关于这里发现前几天VSCode自动更新了,我的版本号变成了1.100.3 才导致了远程连接出…...
从深圳崛起的“机器之眼”:赴港乐动机器人的万亿赛道赶考路
进入2025年以来,尽管围绕人形机器人、具身智能等机器人赛道的质疑声不断,但全球市场热度依然高涨,入局者持续增加。 以国内市场为例,天眼查专业版数据显示,截至5月底,我国现存在业、存续状态的机器人相关企…...
Auto-Coder使用GPT-4o完成:在用TabPFN这个模型构建一个预测未来3天涨跌的分类任务
通过akshare库,获取股票数据,并生成TabPFN这个模型 可以识别、处理的格式,写一个完整的预处理示例,并构建一个预测未来 3 天股价涨跌的分类任务 用TabPFN这个模型构建一个预测未来 3 天股价涨跌的分类任务,进行预测并输…...
c++ 面试题(1)-----深度优先搜索(DFS)实现
操作系统:ubuntu22.04 IDE:Visual Studio Code 编程语言:C11 题目描述 地上有一个 m 行 n 列的方格,从坐标 [0,0] 起始。一个机器人可以从某一格移动到上下左右四个格子,但不能进入行坐标和列坐标的数位之和大于 k 的格子。 例…...
Java 加密常用的各种算法及其选择
在数字化时代,数据安全至关重要,Java 作为广泛应用的编程语言,提供了丰富的加密算法来保障数据的保密性、完整性和真实性。了解这些常用加密算法及其适用场景,有助于开发者在不同的业务需求中做出正确的选择。 一、对称加密算法…...
Android 之 kotlin 语言学习笔记三(Kotlin-Java 互操作)
参考官方文档:https://developer.android.google.cn/kotlin/interop?hlzh-cn 一、Java(供 Kotlin 使用) 1、不得使用硬关键字 不要使用 Kotlin 的任何硬关键字作为方法的名称 或字段。允许使用 Kotlin 的软关键字、修饰符关键字和特殊标识…...
Android第十三次面试总结(四大 组件基础)
Activity生命周期和四大启动模式详解 一、Activity 生命周期 Activity 的生命周期由一系列回调方法组成,用于管理其创建、可见性、焦点和销毁过程。以下是核心方法及其调用时机: onCreate() 调用时机:Activity 首次创建时调用。…...
以光量子为例,详解量子获取方式
光量子技术获取量子比特可在室温下进行。该方式有望通过与名为硅光子学(silicon photonics)的光波导(optical waveguide)芯片制造技术和光纤等光通信技术相结合来实现量子计算机。量子力学中,光既是波又是粒子。光子本…...
【VLNs篇】07:NavRL—在动态环境中学习安全飞行
项目内容论文标题NavRL: 在动态环境中学习安全飞行 (NavRL: Learning Safe Flight in Dynamic Environments)核心问题解决无人机在包含静态和动态障碍物的复杂环境中进行安全、高效自主导航的挑战,克服传统方法和现有强化学习方法的局限性。核心算法基于近端策略优化…...
GO协程(Goroutine)问题总结
在使用Go语言来编写代码时,遇到的一些问题总结一下 [参考文档]:https://www.topgoer.com/%E5%B9%B6%E5%8F%91%E7%BC%96%E7%A8%8B/goroutine.html 1. main()函数默认的Goroutine 场景再现: 今天在看到这个教程的时候,在自己的电…...
