flask整合rabbitMQ插件的方式
文章目录
- 二、Python-flask-rabbitMQ-插件方式整合
- 引言
- 具体步骤
- 1 安装依赖:
- 2 编写实体类:
- 3 编写消费者和生产者:
- 4 初始化消费者和生产者:
- 5 其他地方使用生产者
二、Python-flask-rabbitMQ-插件方式整合
引言
当今互联网应用的高并发场景下,如何保证可靠的消息传递和高效的处理成为了一项重要的挑战。在这种情况下,RabbitMQ作为一种可靠的消息队列系统,被广泛应用于各个领域。
本文将介绍如何使用RabbitMQ插件整合Flask框架,实现并发性生产者和消费者的兼容。通过这种方式,我们可以利用RabbitMQ的优势来确保消息的可靠传递,并提高应用程序的处理能力。
首先,我们需要在Flask应用程序中引入RabbitMQ插件。可以使用Pika库来连接和操作RabbitMQ。通过创建一个连接池管理连接对象,我们可以避免频繁地创建和销毁连接,提高效率。
同时,为了处理高并发的生产者,我们可以使用批量发送消息的方式来减少通道创建和消息发布的开销。通过设置缓冲区来收集一定数量或一定时间间隔内的消息,然后批量发送,可以更有效地利用资源。
此外,对于高并发的消费者,我们可以考虑使用异步的方式来处理消息。通过将消息发送任务交给后台线程或异步任务队列处理,可以避免请求的堵塞,提高应用程序的并发能力。
通过以上的优化方案,我们可以在Flask应用程序中充分利用RabbitMQ的功能,并且兼容高并发的生产者和消费者。这将帮助我们构建更可靠、高效的消息队列系统,应对日益增长的并发访问需求。
总之,使用RabbitMQ插件整合Flask框架,并采用优化方案来兼容并发性生产者和消费者,是构建可靠、高效消息传递系统的关键一步。通过这种方式,我们能够更好地应对高并发场景下的挑战,提升应用程序的性能和稳定性。
具体步骤
1 安装依赖:
使用pip安装pika库:
pip install pika
2 编写实体类:
from queue import Queue
from threading import Lockimport pika# 定义交换机类型的枚举值
class ExchangeType:DEFAULT = 'default'DIRECT = "direct"FANOUT = "fanout"TOPIC = 'topic'class RabbitMQ:def __init__(self, host='localhost', port=5672, username='guest', password='guest', pool_size=10):self.credentials = pika.PlainCredentials(username, password)self.parameters = pika.ConnectionParameters(host=host, port=port, credentials=self.credentials)self.connection_pool = Queue(pool_size) # 连接池,存储连接和信道self.lock = Lock() # 互斥锁,用于对连接池的访问进行同步for _ in range(pool_size):connection = self._create_connection()channel = connection.channel()self.connection_pool.put((connection, channel))def _create_connection(self):return pika.BlockingConnection(self.parameters)def get_channel(self):with self.lock:connection, channel = self.connection_pool.get() # 从连接池获取连接和信道return connection, channeldef release_channel(self, connection, channel):with self.lock:self.connection_pool.put((connection, channel)) # 将连接和信道放回连接池def send_message(self, exchange, routing_key, message, exchange_type=ExchangeType.DEFAULT):connection, channel = self.get_channel()try:channel.exchange_declare(exchange=exchange, exchange_type=exchange_type) # 声明交换机并指定类型channel.basic_publish(exchange=exchange, routing_key=routing_key, body=message) # 发布消息finally:self.release_channel(connection, channel)def receive_messages(self, queue, callback):connection, channel = self.get_channel()try:channel.queue_declare(queue=queue, durable=True) # 声明队列并标记为持久化# channel.queue_purge(queue=queue) # 清空队列,以防之前的非持久化消息残留channel.basic_qos(prefetch_count=10) # 每次从 RabbitMQ 获取 10 条消息channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=False) # 消费消息并设置回调函数channel.start_consuming() # 开始消费消息finally:self.release_channel(connection, channel)
3 编写消费者和生产者:
def connect_mq(app):# 初始化 RabbitMQ 实例rabbitmq = RabbitMQ(host=app.config['RABBITMQ_HOST'], port=5672, username='guest', password='guest')# 在应用上下文中注册 RabbitMQ 实例app.config['RABBITMQ'] = rabbitmq# consume_mq(app)#thread = threading.Thread(target=consume_mq, args=(app,))# 启动线程thread.start()def consume_mq(app):# 启动消费者程序,开始接收和处理消息def callback(ch, method, properties, body):try:print(f"消息队列内容 {body.decode()}")# 处理rabbitMQ内容to_transcribe(body.decode())except Exception as e:print(str(e))ch.basic_ack(delivery_tag=method.delivery_tag)# 启动消费者程序,开始接收和处理消息with app.app_context():rabbitmq = current_app.config['RABBITMQ']rabbitmq.receive_messages('audio_queue', callback)
4 初始化消费者和生产者:
def create_app():app = Flask(__name__)connect_mq_v1(app)
5 其他地方使用生产者
class MessageHandler:"""处理存放音频,将所有的任务都放在MQ里面"""def __init__(self, dir_name,uuid_str, back_url, file_url, request_type, file_name, *args, **kwargs):# 文件夹名称self.dir_name = dir_name# 文件名称self.file_name = file_name# 文件上传类型self.request_type = request_type# 文件存储位置self.file_url = file_url# 客户端回调地址self.back_url = back_url# 唯一标识self.uuid_str = uuid_strdef send(self):""":param content_type:队列类型:param rpc:MQ对象:return:"""try:# 发送消息队列# rpc.send_expire(body=json.dumps(self.to_json()), exchange='audio_queue', key='audio_queue')rabbitmq = current_app.config['RABBITMQ']rabbitmq.send_message('audio_queue', 'audio_queue', json.dumps(self.to_json()))print("发送消息到mq成功,用于存放音频信息")except Exception as e:print(f"发送消息到mq服务失败,请检查, {e}")def to_json(self):_dict = self.__dict__return _dict
# 将请求体和uuid放到rabbitMQ中
MessageHandler(**dates).send()
相关文章:
flask整合rabbitMQ插件的方式
文章目录 二、Python-flask-rabbitMQ-插件方式整合引言具体步骤1 安装依赖:2 编写实体类:3 编写消费者和生产者:4 初始化消费者和生产者:5 其他地方使用生产者 二、Python-flask-rabbitMQ-插件方式整合 引言 当今互联网应用的高…...

【React】高频面试题
1. 简述下 React 的事件代理机制? React使用了一种称为“事件代理”(Event Delegation)的机制来处理事件。事件代理是指将事件处理程序绑定到组件的父级元素上,然后在需要处理事件的子元素上触发事件时,事件将被委托给…...

Java数据结构之稀疏数组
目录 线性结构与非线性结构线性结构非线性结构 稀疏数组应用场景 代码实现二维数组转稀疏数组稀疏数组转二维数组 线性结构与非线性结构 线性结构 数据结构分两种,线性与非线性,线性结构的数据元素之间存在一对一的关系。 一对一指的是每个数据元素都…...

迅为RK3568开发板RTMP推流之视频监控
1 搭建 RTMP 媒流体服务器 nginx-rtmp 是一个基于 nginx 的 RTMP 服务模块,是一个功能强大的流媒体服务器模块, 它提供了丰富的功能和灵活的配置选项,适用于构建各种规模的流媒体平台和应用。无论是搭建实时视频直播平台、点播系统或多屏互…...
利用CSRF或XSS攻击网站的例子
利用 CSRF 攻击网站的简单示例: 假设有一个在线银行应用,用户可以在其中执行转账操作。用户登录后,系统会生成一个包含转账信息的表单,用户需要填写表单来发起转账。这个表单如下所示: <form action"https:/…...

LeetCode讲解篇之113. 路径总和 II
文章目录 题目描述题解思路题解代码 题目描述 题解思路 深度优先遍历二叉树,遍历的同时记录路径,直到遍历到叶节点,若路径和为targetSum则添加到结果集中 题解代码 func pathSum(root *TreeNode, targetSum int) [][]int {var res make([…...
中国HR从业者现状是怎样的?应如何提升自己?
HR(Human Resource)解释为人力资源,现在统称为人力资源顾问,跟传统人事有本质区别。传统人事一般是和行政部做相类似的工作,比如招聘,培训,职员的考核,职员的薪酬,职员调动等。现代人力资源&…...

Promise笔记-同步回调-异步回调-JS中的异常error处理-Promis的理解和使用-基本使用-链式调用-七个关键问题
Promise笔记 1. 预备知识1.1 实例对象与函数对象1.2 两种类型的回调函数1. 同步回调2. 异步回调 1.3 JS中的异常error处理1. 错误的类型2. 错误处理(捕获与抛出)3. 错误对象 2.Promise的理解和使用2.1 Promise是什么1.理解Promise2.Promise 的状态3. Pro…...
计算机考研自命题(2)
1、C语言-字符串交替拼接 1、用C编程,将两个字符串数组存储实现交替连接如aaa和bbb两个字符连接成ababab 如aaa和baba 两个字符,连接成 abaaaba #include<stdio.h>/* 解题思路:将两个字符串交替拼接,定义三个数组࿰…...

ZKP6.1 Discrete-log-based Polynomial Commitments (Preliminary)
ZKP学习笔记 ZK-Learning MOOC课程笔记 Lecture 6: Discrete-log-based Polynomial Commitments (Yupeng Zhang) Recall How to build an efficient SNARK? A polynomial commitment scheme A polynomial interactive oracle proof (IOP) SNARK for general circuits Plo…...

五金经营小程序商城的作用体现在哪
对消费者而言,如今线上购买五金是很多人的选择,传统线下购买,不仅需要跑路,而且店内未必有所需品,但线上平台则一目了然购买所需品,本地/外地均可以触达到,同时还可对用户/会员进行高效管理&…...

今年这行情,不会自动化的要做好心理准备了
李强是一名软件测试工程师,入行之后在一家小型公司工作了五年。这段时间里,他主要负责手工测试和一些简单的自动化测试工作。由于公司项目也相对简单,他逐渐陷入了工作的舒适区,没有积极追求新的知识和技能。 然而随着身边朋友发展…...
汽车保养笔记
汽车保养笔记 汽车小保养汽车大保养五油:机油变速箱油刹车油转向助力油离合器油 四滤:机油滤芯更换空气滤芯更换空调滤芯更换汽油滤芯更换 三水防冻液(水)玻璃水电瓶水 其他刹车片球头减震器火花塞 4S店的4大套路---没必要清洗节气门更换火花塞和高压线圈…...

【斗破年番】官方改编用心了,彩鳞怀孕并未删,萧潇肯定登场,真相在丹药身上
【侵权联系删除】 【文/郑尔巴金】 斗破苍穹年番动画已经更新了,相信不少人都感觉到不可思议,萧炎跟随美杜莎女王回蛇人族的剧情,居然魔改成这样。好好的腹中孕育出新生命,变成了陨落心炎残余能量,不及时处理的话&…...

英语——分享篇——每日200词——3201-3400
3201——air-conditioning——[eərkəndɪʃnɪŋ]——n.空调设备;vt.给…装上空调——air-conditioning——air-condition空调(熟词)ing鹰(谐音)——空调设备的噪音让鹰不得安宁——The trains dont even have proper air-conditioning, grumbles Mr So. ——地铁…...
合并区间(C++解法)
题目 以数组 intervals 表示若干个区间的集合,其中单个区间为 intervals[i] [starti, endi] 。请你合并所有重叠的区间,并返回 一个不重叠的区间数组,该数组需恰好覆盖输入中的所有区间 。 示例 1: 输入:intervals …...

CUDA学习笔记(十四) Constant Memory
转载至https://www.cnblogs.com/1024incn/tag/CUDA/ CONSTANT MEMORY constant Memory对于device来说只读但是对于host是可读可写。constant Memory和global Memory一样都位于DRAM,并且有一个独立的on-chip cache,比直接从constant Memory读取要快得多…...

使用MFC创建一个SaleSystem
目录 1、项目的创建: 2、项目的配置: 3、设置窗口属性: (1)、设置图标 1)、添加导入资源 2)、代码初始化图标 (2)、设置标题 (3)、设置窗口…...

grafana v10.1版本设置告警
1. 相关概念概述 如图所示,点击切换菜单标志,可以看到警报相关子选项。 警报规则:通过PromQL语句定义告警规则,即达到怎样的状态触发告警。 联络点: 设置当警报规则实例触发时,如何通知联系人,…...

Python+Requests+PyTest+Excel+Allure 接口自动化测试实战
本文主要介绍了PythonRequestsPyTestExcelAllure 接口自动化测试实战,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧 Unittest是Python标准库中自带的单元测试框架…...

linux之kylin系统nginx的安装
一、nginx的作用 1.可做高性能的web服务器 直接处理静态资源(HTML/CSS/图片等),响应速度远超传统服务器类似apache支持高并发连接 2.反向代理服务器 隐藏后端服务器IP地址,提高安全性 3.负载均衡服务器 支持多种策略分发流量…...

解决Ubuntu22.04 VMware失败的问题 ubuntu入门之二十八
现象1 打开VMware失败 Ubuntu升级之后打开VMware上报需要安装vmmon和vmnet,点击确认后如下提示 最终上报fail 解决方法 内核升级导致,需要在新内核下重新下载编译安装 查看版本 $ vmware -v VMware Workstation 17.5.1 build-23298084$ lsb_release…...

Opencv中的addweighted函数
一.addweighted函数作用 addweighted()是OpenCV库中用于图像处理的函数,主要功能是将两个输入图像(尺寸和类型相同)按照指定的权重进行加权叠加(图像融合),并添加一个标量值&#x…...
css的定位(position)详解:相对定位 绝对定位 固定定位
在 CSS 中,元素的定位通过 position 属性控制,共有 5 种定位模式:static(静态定位)、relative(相对定位)、absolute(绝对定位)、fixed(固定定位)和…...
Java入门学习详细版(一)
大家好,Java 学习是一个系统学习的过程,核心原则就是“理论 实践 坚持”,并且需循序渐进,不可过于着急,本篇文章推出的这份详细入门学习资料将带大家从零基础开始,逐步掌握 Java 的核心概念和编程技能。 …...
「全栈技术解析」推客小程序系统开发:从架构设计到裂变增长的完整解决方案
在移动互联网营销竞争白热化的当下,推客小程序系统凭借其裂变传播、精准营销等特性,成为企业抢占市场的利器。本文将深度解析推客小程序系统开发的核心技术与实现路径,助力开发者打造具有市场竞争力的营销工具。 一、系统核心功能架构&…...

Python环境安装与虚拟环境配置详解
本文档旨在为Python开发者提供一站式的环境安装与虚拟环境配置指南,适用于Windows、macOS和Linux系统。无论你是初学者还是有经验的开发者,都能在此找到适合自己的环境搭建方法和常见问题的解决方案。 快速开始 一分钟快速安装与虚拟环境配置 # macOS/…...

【Elasticsearch基础】Elasticsearch批量操作(Bulk API)深度解析与实践指南
目录 1 Bulk API概述 1.1 什么是批量操作 1.2 Bulk API的优势 2 Bulk API的工作原理 2.1 请求处理流程 2.2 底层机制 3 Bulk API的使用方法 3.1 基本请求格式 3.2 操作类型示例 3.3 响应格式 4 Bulk API的最佳实践 4.1 批量大小优化 4.2 错误处理策略 4.3 性能调…...
Monorepo架构: 项目管理模式对比与考量
关于 monorepo 相关概念及项目管理模式 在软件开发中,尤其是前端项目,我们会涉及到不同的项目管理模式,这里先介绍几个重要的概念“monorepo”是当前较为热门的一种项目管理方式,虽然很多人可能听说过,但可能在实际项…...

STM32CubeMX-H7-19-ESP8266通信(中)--单片机控制ESP8266实现TCP地址通信
前言 上篇文章我们已经能够使用串口助手实现esp8266的几种通信,接下来我们使用单片机控制实现。这篇文章会附带教程,增加.c和,.h,把串口和定时器放到对应的编号,然后调用初始化就可以使用了。 先讲解,然后末尾再放源码…...