flask整合rabbitMQ插件的方式
文章目录
- 二、Python-flask-rabbitMQ-插件方式整合
- 引言
- 具体步骤
- 1 安装依赖:
- 2 编写实体类:
- 3 编写消费者和生产者:
- 4 初始化消费者和生产者:
- 5 其他地方使用生产者
二、Python-flask-rabbitMQ-插件方式整合
引言
当今互联网应用的高并发场景下,如何保证可靠的消息传递和高效的处理成为了一项重要的挑战。在这种情况下,RabbitMQ作为一种可靠的消息队列系统,被广泛应用于各个领域。
本文将介绍如何使用RabbitMQ插件整合Flask框架,实现并发性生产者和消费者的兼容。通过这种方式,我们可以利用RabbitMQ的优势来确保消息的可靠传递,并提高应用程序的处理能力。
首先,我们需要在Flask应用程序中引入RabbitMQ插件。可以使用Pika库来连接和操作RabbitMQ。通过创建一个连接池管理连接对象,我们可以避免频繁地创建和销毁连接,提高效率。
同时,为了处理高并发的生产者,我们可以使用批量发送消息的方式来减少通道创建和消息发布的开销。通过设置缓冲区来收集一定数量或一定时间间隔内的消息,然后批量发送,可以更有效地利用资源。
此外,对于高并发的消费者,我们可以考虑使用异步的方式来处理消息。通过将消息发送任务交给后台线程或异步任务队列处理,可以避免请求的堵塞,提高应用程序的并发能力。
通过以上的优化方案,我们可以在Flask应用程序中充分利用RabbitMQ的功能,并且兼容高并发的生产者和消费者。这将帮助我们构建更可靠、高效的消息队列系统,应对日益增长的并发访问需求。
总之,使用RabbitMQ插件整合Flask框架,并采用优化方案来兼容并发性生产者和消费者,是构建可靠、高效消息传递系统的关键一步。通过这种方式,我们能够更好地应对高并发场景下的挑战,提升应用程序的性能和稳定性。
具体步骤
1 安装依赖:
使用pip安装pika库:
pip install pika
2 编写实体类:
from queue import Queue
from threading import Lockimport pika# 定义交换机类型的枚举值
class ExchangeType:DEFAULT = 'default'DIRECT = "direct"FANOUT = "fanout"TOPIC = 'topic'class RabbitMQ:def __init__(self, host='localhost', port=5672, username='guest', password='guest', pool_size=10):self.credentials = pika.PlainCredentials(username, password)self.parameters = pika.ConnectionParameters(host=host, port=port, credentials=self.credentials)self.connection_pool = Queue(pool_size) # 连接池,存储连接和信道self.lock = Lock() # 互斥锁,用于对连接池的访问进行同步for _ in range(pool_size):connection = self._create_connection()channel = connection.channel()self.connection_pool.put((connection, channel))def _create_connection(self):return pika.BlockingConnection(self.parameters)def get_channel(self):with self.lock:connection, channel = self.connection_pool.get() # 从连接池获取连接和信道return connection, channeldef release_channel(self, connection, channel):with self.lock:self.connection_pool.put((connection, channel)) # 将连接和信道放回连接池def send_message(self, exchange, routing_key, message, exchange_type=ExchangeType.DEFAULT):connection, channel = self.get_channel()try:channel.exchange_declare(exchange=exchange, exchange_type=exchange_type) # 声明交换机并指定类型channel.basic_publish(exchange=exchange, routing_key=routing_key, body=message) # 发布消息finally:self.release_channel(connection, channel)def receive_messages(self, queue, callback):connection, channel = self.get_channel()try:channel.queue_declare(queue=queue, durable=True) # 声明队列并标记为持久化# channel.queue_purge(queue=queue) # 清空队列,以防之前的非持久化消息残留channel.basic_qos(prefetch_count=10) # 每次从 RabbitMQ 获取 10 条消息channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=False) # 消费消息并设置回调函数channel.start_consuming() # 开始消费消息finally:self.release_channel(connection, channel)
3 编写消费者和生产者:
def connect_mq(app):# 初始化 RabbitMQ 实例rabbitmq = RabbitMQ(host=app.config['RABBITMQ_HOST'], port=5672, username='guest', password='guest')# 在应用上下文中注册 RabbitMQ 实例app.config['RABBITMQ'] = rabbitmq# consume_mq(app)#thread = threading.Thread(target=consume_mq, args=(app,))# 启动线程thread.start()def consume_mq(app):# 启动消费者程序,开始接收和处理消息def callback(ch, method, properties, body):try:print(f"消息队列内容 {body.decode()}")# 处理rabbitMQ内容to_transcribe(body.decode())except Exception as e:print(str(e))ch.basic_ack(delivery_tag=method.delivery_tag)# 启动消费者程序,开始接收和处理消息with app.app_context():rabbitmq = current_app.config['RABBITMQ']rabbitmq.receive_messages('audio_queue', callback)
4 初始化消费者和生产者:
def create_app():app = Flask(__name__)connect_mq_v1(app)
5 其他地方使用生产者
class MessageHandler:"""处理存放音频,将所有的任务都放在MQ里面"""def __init__(self, dir_name,uuid_str, back_url, file_url, request_type, file_name, *args, **kwargs):# 文件夹名称self.dir_name = dir_name# 文件名称self.file_name = file_name# 文件上传类型self.request_type = request_type# 文件存储位置self.file_url = file_url# 客户端回调地址self.back_url = back_url# 唯一标识self.uuid_str = uuid_strdef send(self):""":param content_type:队列类型:param rpc:MQ对象:return:"""try:# 发送消息队列# rpc.send_expire(body=json.dumps(self.to_json()), exchange='audio_queue', key='audio_queue')rabbitmq = current_app.config['RABBITMQ']rabbitmq.send_message('audio_queue', 'audio_queue', json.dumps(self.to_json()))print("发送消息到mq成功,用于存放音频信息")except Exception as e:print(f"发送消息到mq服务失败,请检查, {e}")def to_json(self):_dict = self.__dict__return _dict
# 将请求体和uuid放到rabbitMQ中
MessageHandler(**dates).send()
相关文章:
flask整合rabbitMQ插件的方式
文章目录 二、Python-flask-rabbitMQ-插件方式整合引言具体步骤1 安装依赖:2 编写实体类:3 编写消费者和生产者:4 初始化消费者和生产者:5 其他地方使用生产者 二、Python-flask-rabbitMQ-插件方式整合 引言 当今互联网应用的高…...
【React】高频面试题
1. 简述下 React 的事件代理机制? React使用了一种称为“事件代理”(Event Delegation)的机制来处理事件。事件代理是指将事件处理程序绑定到组件的父级元素上,然后在需要处理事件的子元素上触发事件时,事件将被委托给…...
Java数据结构之稀疏数组
目录 线性结构与非线性结构线性结构非线性结构 稀疏数组应用场景 代码实现二维数组转稀疏数组稀疏数组转二维数组 线性结构与非线性结构 线性结构 数据结构分两种,线性与非线性,线性结构的数据元素之间存在一对一的关系。 一对一指的是每个数据元素都…...
迅为RK3568开发板RTMP推流之视频监控
1 搭建 RTMP 媒流体服务器 nginx-rtmp 是一个基于 nginx 的 RTMP 服务模块,是一个功能强大的流媒体服务器模块, 它提供了丰富的功能和灵活的配置选项,适用于构建各种规模的流媒体平台和应用。无论是搭建实时视频直播平台、点播系统或多屏互…...
利用CSRF或XSS攻击网站的例子
利用 CSRF 攻击网站的简单示例: 假设有一个在线银行应用,用户可以在其中执行转账操作。用户登录后,系统会生成一个包含转账信息的表单,用户需要填写表单来发起转账。这个表单如下所示: <form action"https:/…...
LeetCode讲解篇之113. 路径总和 II
文章目录 题目描述题解思路题解代码 题目描述 题解思路 深度优先遍历二叉树,遍历的同时记录路径,直到遍历到叶节点,若路径和为targetSum则添加到结果集中 题解代码 func pathSum(root *TreeNode, targetSum int) [][]int {var res make([…...
中国HR从业者现状是怎样的?应如何提升自己?
HR(Human Resource)解释为人力资源,现在统称为人力资源顾问,跟传统人事有本质区别。传统人事一般是和行政部做相类似的工作,比如招聘,培训,职员的考核,职员的薪酬,职员调动等。现代人力资源&…...
Promise笔记-同步回调-异步回调-JS中的异常error处理-Promis的理解和使用-基本使用-链式调用-七个关键问题
Promise笔记 1. 预备知识1.1 实例对象与函数对象1.2 两种类型的回调函数1. 同步回调2. 异步回调 1.3 JS中的异常error处理1. 错误的类型2. 错误处理(捕获与抛出)3. 错误对象 2.Promise的理解和使用2.1 Promise是什么1.理解Promise2.Promise 的状态3. Pro…...
计算机考研自命题(2)
1、C语言-字符串交替拼接 1、用C编程,将两个字符串数组存储实现交替连接如aaa和bbb两个字符连接成ababab 如aaa和baba 两个字符,连接成 abaaaba #include<stdio.h>/* 解题思路:将两个字符串交替拼接,定义三个数组࿰…...
ZKP6.1 Discrete-log-based Polynomial Commitments (Preliminary)
ZKP学习笔记 ZK-Learning MOOC课程笔记 Lecture 6: Discrete-log-based Polynomial Commitments (Yupeng Zhang) Recall How to build an efficient SNARK? A polynomial commitment scheme A polynomial interactive oracle proof (IOP) SNARK for general circuits Plo…...
五金经营小程序商城的作用体现在哪
对消费者而言,如今线上购买五金是很多人的选择,传统线下购买,不仅需要跑路,而且店内未必有所需品,但线上平台则一目了然购买所需品,本地/外地均可以触达到,同时还可对用户/会员进行高效管理&…...
今年这行情,不会自动化的要做好心理准备了
李强是一名软件测试工程师,入行之后在一家小型公司工作了五年。这段时间里,他主要负责手工测试和一些简单的自动化测试工作。由于公司项目也相对简单,他逐渐陷入了工作的舒适区,没有积极追求新的知识和技能。 然而随着身边朋友发展…...
汽车保养笔记
汽车保养笔记 汽车小保养汽车大保养五油:机油变速箱油刹车油转向助力油离合器油 四滤:机油滤芯更换空气滤芯更换空调滤芯更换汽油滤芯更换 三水防冻液(水)玻璃水电瓶水 其他刹车片球头减震器火花塞 4S店的4大套路---没必要清洗节气门更换火花塞和高压线圈…...
【斗破年番】官方改编用心了,彩鳞怀孕并未删,萧潇肯定登场,真相在丹药身上
【侵权联系删除】 【文/郑尔巴金】 斗破苍穹年番动画已经更新了,相信不少人都感觉到不可思议,萧炎跟随美杜莎女王回蛇人族的剧情,居然魔改成这样。好好的腹中孕育出新生命,变成了陨落心炎残余能量,不及时处理的话&…...
英语——分享篇——每日200词——3201-3400
3201——air-conditioning——[eərkəndɪʃnɪŋ]——n.空调设备;vt.给…装上空调——air-conditioning——air-condition空调(熟词)ing鹰(谐音)——空调设备的噪音让鹰不得安宁——The trains dont even have proper air-conditioning, grumbles Mr So. ——地铁…...
合并区间(C++解法)
题目 以数组 intervals 表示若干个区间的集合,其中单个区间为 intervals[i] [starti, endi] 。请你合并所有重叠的区间,并返回 一个不重叠的区间数组,该数组需恰好覆盖输入中的所有区间 。 示例 1: 输入:intervals …...
CUDA学习笔记(十四) Constant Memory
转载至https://www.cnblogs.com/1024incn/tag/CUDA/ CONSTANT MEMORY constant Memory对于device来说只读但是对于host是可读可写。constant Memory和global Memory一样都位于DRAM,并且有一个独立的on-chip cache,比直接从constant Memory读取要快得多…...
使用MFC创建一个SaleSystem
目录 1、项目的创建: 2、项目的配置: 3、设置窗口属性: (1)、设置图标 1)、添加导入资源 2)、代码初始化图标 (2)、设置标题 (3)、设置窗口…...
grafana v10.1版本设置告警
1. 相关概念概述 如图所示,点击切换菜单标志,可以看到警报相关子选项。 警报规则:通过PromQL语句定义告警规则,即达到怎样的状态触发告警。 联络点: 设置当警报规则实例触发时,如何通知联系人,…...
Python+Requests+PyTest+Excel+Allure 接口自动化测试实战
本文主要介绍了PythonRequestsPyTestExcelAllure 接口自动化测试实战,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧 Unittest是Python标准库中自带的单元测试框架…...
Spring IOC 源码学习 事务相关的 BeanDefinition 解析过程 (XML)副
从0构建WAV文件:读懂计算机文件的本质 虽然接触计算机有一段时间了,但是我的视野一直局限于一个较小的范围之内,往往只能看到于算法竞赛相关的内容,计算机各种文件在我看来十分复杂,认为构建他们并能达到目的是一件困难…...
记录一个使用AI开发企业官网的思路
背景 今天在开发一个企业官网,想使用AI来开发,记录一下AI系统提示词,供大家学习。 AI提示词如下 角色:你是一位资深的全栈开发专家,精通Vue 3.0技术栈和现代UI/UX设计,善于将品牌故事转化为具有感染力的数字…...
告别Python+Netmiko!Rust+NexusOps如何重塑网络自动化
# 🚀 告别PythonNetmiko!RustNexusOps如何重塑网络自动化> 作者:NexusOps技术团队 | 原创 | 转载请注明出处> 标签:网络自动化、Rust、Netmiko、网络运维、Python## 📋 文章目录- [一、前言:为什么需…...
大模型SLA必须包含的4类动态条款(负载突增弹性系数、多租户隔离保障、模型版本回滚SLA继承规则、安全合规中断豁免机制)
第一章:大模型工程化服务等级协议SLA设计 2026奇点智能技术大会(https://ml-summit.org) 大模型工程化落地的核心挑战之一,在于将非确定性推理能力封装为可度量、可保障、可运维的生产级服务。SLA设计不再是传统API响应延迟与可用性的简单延伸ÿ…...
Qwen3-TTS-12Hz-1.7B-VoiceDesign提示词工程:精准控制语音输出
Qwen3-TTS-12Hz-1.7B-VoiceDesign提示词工程:精准控制语音输出 用自然语言描述你心中的声音,让AI帮你实现 你有没有试过在脑子里想象一个特别的声音,却不知道怎么用技术参数来表达?比如想要一个"略带沙哑的成熟男声ÿ…...
金融可视化组件实战指南:美国线图、均线图与K线图的应用解析
1. 金融可视化三剑客:美国线图、均线图与K线图入门 第一次接触金融图表时,我被满屏的红绿柱子弄得头晕眼花。直到一位老交易员告诉我:"这些图表就像股票的心电图,读懂它们就能听见市场的心跳。"今天我们就来拆解金融领域…...
Vue3项目实战:5分钟搞定DWG文件在线预览(VisualizeJS+VSF流)
Vue3DWG文件在线预览实战:VisualizeJS与VSF流的高效集成指南 在工业设计、建筑规划和机械制造领域,DWG文件作为AutoCAD的标准格式,其在线预览需求日益增长。传统解决方案往往依赖专业桌面软件或复杂的服务端渲染,而现代Web技术已经…...
深入解析M.2 B Key接口在5G模块与(U)SIM卡电路设计中的关键应用
1. M.2 B Key接口与5G模块的完美结合 第一次接触M.2 B Key接口时,我完全被它的小巧和多功能性震惊了。这个看起来像迷你版SSD插槽的接口,竟然能承载5G模块这么复杂的通信功能。在实际项目中,我发现M.2 B Key接口特别适合嵌入式设备使用&#…...
打造沉浸式智能AI问答助手:Vue + UniApp 全端实战(支持 Markdown/公式/多模态交互)竿
OCP原则 ocp指开闭原则,对扩展开放,对修改关闭。是七大原则中最基本的一个原则。 依赖倒置原则(DIP) 什么是依赖倒置原则 核心是面向接口编程、面向抽象编程, 不是面向具体编程。 依赖倒置原则的目的 降低耦合度&#…...
使用Spring AI Alibaba构建智能体Agent拔
背景 在软件开发的漫长旅途中,"构建"这个词往往让人又爱又恨。爱的是,一键点击,代码变成产品,那是程序员最迷人的时刻;恨的是,维护那一堆乱糟糟的构建脚本,简直是噩梦。 在很多项目中…...
