flask整合rabbitMQ插件的方式
文章目录
- 二、Python-flask-rabbitMQ-插件方式整合
- 引言
- 具体步骤
- 1 安装依赖:
- 2 编写实体类:
- 3 编写消费者和生产者:
- 4 初始化消费者和生产者:
- 5 其他地方使用生产者
二、Python-flask-rabbitMQ-插件方式整合
引言
当今互联网应用的高并发场景下,如何保证可靠的消息传递和高效的处理成为了一项重要的挑战。在这种情况下,RabbitMQ作为一种可靠的消息队列系统,被广泛应用于各个领域。
本文将介绍如何使用RabbitMQ插件整合Flask框架,实现并发性生产者和消费者的兼容。通过这种方式,我们可以利用RabbitMQ的优势来确保消息的可靠传递,并提高应用程序的处理能力。
首先,我们需要在Flask应用程序中引入RabbitMQ插件。可以使用Pika库来连接和操作RabbitMQ。通过创建一个连接池管理连接对象,我们可以避免频繁地创建和销毁连接,提高效率。
同时,为了处理高并发的生产者,我们可以使用批量发送消息的方式来减少通道创建和消息发布的开销。通过设置缓冲区来收集一定数量或一定时间间隔内的消息,然后批量发送,可以更有效地利用资源。
此外,对于高并发的消费者,我们可以考虑使用异步的方式来处理消息。通过将消息发送任务交给后台线程或异步任务队列处理,可以避免请求的堵塞,提高应用程序的并发能力。
通过以上的优化方案,我们可以在Flask应用程序中充分利用RabbitMQ的功能,并且兼容高并发的生产者和消费者。这将帮助我们构建更可靠、高效的消息队列系统,应对日益增长的并发访问需求。
总之,使用RabbitMQ插件整合Flask框架,并采用优化方案来兼容并发性生产者和消费者,是构建可靠、高效消息传递系统的关键一步。通过这种方式,我们能够更好地应对高并发场景下的挑战,提升应用程序的性能和稳定性。
具体步骤
1 安装依赖:
使用pip安装pika库:
pip install pika
2 编写实体类:
from queue import Queue
from threading import Lockimport pika# 定义交换机类型的枚举值
class ExchangeType:DEFAULT = 'default'DIRECT = "direct"FANOUT = "fanout"TOPIC = 'topic'class RabbitMQ:def __init__(self, host='localhost', port=5672, username='guest', password='guest', pool_size=10):self.credentials = pika.PlainCredentials(username, password)self.parameters = pika.ConnectionParameters(host=host, port=port, credentials=self.credentials)self.connection_pool = Queue(pool_size) # 连接池,存储连接和信道self.lock = Lock() # 互斥锁,用于对连接池的访问进行同步for _ in range(pool_size):connection = self._create_connection()channel = connection.channel()self.connection_pool.put((connection, channel))def _create_connection(self):return pika.BlockingConnection(self.parameters)def get_channel(self):with self.lock:connection, channel = self.connection_pool.get() # 从连接池获取连接和信道return connection, channeldef release_channel(self, connection, channel):with self.lock:self.connection_pool.put((connection, channel)) # 将连接和信道放回连接池def send_message(self, exchange, routing_key, message, exchange_type=ExchangeType.DEFAULT):connection, channel = self.get_channel()try:channel.exchange_declare(exchange=exchange, exchange_type=exchange_type) # 声明交换机并指定类型channel.basic_publish(exchange=exchange, routing_key=routing_key, body=message) # 发布消息finally:self.release_channel(connection, channel)def receive_messages(self, queue, callback):connection, channel = self.get_channel()try:channel.queue_declare(queue=queue, durable=True) # 声明队列并标记为持久化# channel.queue_purge(queue=queue) # 清空队列,以防之前的非持久化消息残留channel.basic_qos(prefetch_count=10) # 每次从 RabbitMQ 获取 10 条消息channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=False) # 消费消息并设置回调函数channel.start_consuming() # 开始消费消息finally:self.release_channel(connection, channel)
3 编写消费者和生产者:
def connect_mq(app):# 初始化 RabbitMQ 实例rabbitmq = RabbitMQ(host=app.config['RABBITMQ_HOST'], port=5672, username='guest', password='guest')# 在应用上下文中注册 RabbitMQ 实例app.config['RABBITMQ'] = rabbitmq# consume_mq(app)#thread = threading.Thread(target=consume_mq, args=(app,))# 启动线程thread.start()def consume_mq(app):# 启动消费者程序,开始接收和处理消息def callback(ch, method, properties, body):try:print(f"消息队列内容 {body.decode()}")# 处理rabbitMQ内容to_transcribe(body.decode())except Exception as e:print(str(e))ch.basic_ack(delivery_tag=method.delivery_tag)# 启动消费者程序,开始接收和处理消息with app.app_context():rabbitmq = current_app.config['RABBITMQ']rabbitmq.receive_messages('audio_queue', callback)
4 初始化消费者和生产者:
def create_app():app = Flask(__name__)connect_mq_v1(app)
5 其他地方使用生产者
class MessageHandler:"""处理存放音频,将所有的任务都放在MQ里面"""def __init__(self, dir_name,uuid_str, back_url, file_url, request_type, file_name, *args, **kwargs):# 文件夹名称self.dir_name = dir_name# 文件名称self.file_name = file_name# 文件上传类型self.request_type = request_type# 文件存储位置self.file_url = file_url# 客户端回调地址self.back_url = back_url# 唯一标识self.uuid_str = uuid_strdef send(self):""":param content_type:队列类型:param rpc:MQ对象:return:"""try:# 发送消息队列# rpc.send_expire(body=json.dumps(self.to_json()), exchange='audio_queue', key='audio_queue')rabbitmq = current_app.config['RABBITMQ']rabbitmq.send_message('audio_queue', 'audio_queue', json.dumps(self.to_json()))print("发送消息到mq成功,用于存放音频信息")except Exception as e:print(f"发送消息到mq服务失败,请检查, {e}")def to_json(self):_dict = self.__dict__return _dict
# 将请求体和uuid放到rabbitMQ中
MessageHandler(**dates).send()
相关文章:
flask整合rabbitMQ插件的方式
文章目录 二、Python-flask-rabbitMQ-插件方式整合引言具体步骤1 安装依赖:2 编写实体类:3 编写消费者和生产者:4 初始化消费者和生产者:5 其他地方使用生产者 二、Python-flask-rabbitMQ-插件方式整合 引言 当今互联网应用的高…...
【React】高频面试题
1. 简述下 React 的事件代理机制? React使用了一种称为“事件代理”(Event Delegation)的机制来处理事件。事件代理是指将事件处理程序绑定到组件的父级元素上,然后在需要处理事件的子元素上触发事件时,事件将被委托给…...
Java数据结构之稀疏数组
目录 线性结构与非线性结构线性结构非线性结构 稀疏数组应用场景 代码实现二维数组转稀疏数组稀疏数组转二维数组 线性结构与非线性结构 线性结构 数据结构分两种,线性与非线性,线性结构的数据元素之间存在一对一的关系。 一对一指的是每个数据元素都…...
迅为RK3568开发板RTMP推流之视频监控
1 搭建 RTMP 媒流体服务器 nginx-rtmp 是一个基于 nginx 的 RTMP 服务模块,是一个功能强大的流媒体服务器模块, 它提供了丰富的功能和灵活的配置选项,适用于构建各种规模的流媒体平台和应用。无论是搭建实时视频直播平台、点播系统或多屏互…...
利用CSRF或XSS攻击网站的例子
利用 CSRF 攻击网站的简单示例: 假设有一个在线银行应用,用户可以在其中执行转账操作。用户登录后,系统会生成一个包含转账信息的表单,用户需要填写表单来发起转账。这个表单如下所示: <form action"https:/…...
LeetCode讲解篇之113. 路径总和 II
文章目录 题目描述题解思路题解代码 题目描述 题解思路 深度优先遍历二叉树,遍历的同时记录路径,直到遍历到叶节点,若路径和为targetSum则添加到结果集中 题解代码 func pathSum(root *TreeNode, targetSum int) [][]int {var res make([…...
中国HR从业者现状是怎样的?应如何提升自己?
HR(Human Resource)解释为人力资源,现在统称为人力资源顾问,跟传统人事有本质区别。传统人事一般是和行政部做相类似的工作,比如招聘,培训,职员的考核,职员的薪酬,职员调动等。现代人力资源&…...
Promise笔记-同步回调-异步回调-JS中的异常error处理-Promis的理解和使用-基本使用-链式调用-七个关键问题
Promise笔记 1. 预备知识1.1 实例对象与函数对象1.2 两种类型的回调函数1. 同步回调2. 异步回调 1.3 JS中的异常error处理1. 错误的类型2. 错误处理(捕获与抛出)3. 错误对象 2.Promise的理解和使用2.1 Promise是什么1.理解Promise2.Promise 的状态3. Pro…...
计算机考研自命题(2)
1、C语言-字符串交替拼接 1、用C编程,将两个字符串数组存储实现交替连接如aaa和bbb两个字符连接成ababab 如aaa和baba 两个字符,连接成 abaaaba #include<stdio.h>/* 解题思路:将两个字符串交替拼接,定义三个数组࿰…...
ZKP6.1 Discrete-log-based Polynomial Commitments (Preliminary)
ZKP学习笔记 ZK-Learning MOOC课程笔记 Lecture 6: Discrete-log-based Polynomial Commitments (Yupeng Zhang) Recall How to build an efficient SNARK? A polynomial commitment scheme A polynomial interactive oracle proof (IOP) SNARK for general circuits Plo…...
五金经营小程序商城的作用体现在哪
对消费者而言,如今线上购买五金是很多人的选择,传统线下购买,不仅需要跑路,而且店内未必有所需品,但线上平台则一目了然购买所需品,本地/外地均可以触达到,同时还可对用户/会员进行高效管理&…...
今年这行情,不会自动化的要做好心理准备了
李强是一名软件测试工程师,入行之后在一家小型公司工作了五年。这段时间里,他主要负责手工测试和一些简单的自动化测试工作。由于公司项目也相对简单,他逐渐陷入了工作的舒适区,没有积极追求新的知识和技能。 然而随着身边朋友发展…...
汽车保养笔记
汽车保养笔记 汽车小保养汽车大保养五油:机油变速箱油刹车油转向助力油离合器油 四滤:机油滤芯更换空气滤芯更换空调滤芯更换汽油滤芯更换 三水防冻液(水)玻璃水电瓶水 其他刹车片球头减震器火花塞 4S店的4大套路---没必要清洗节气门更换火花塞和高压线圈…...
【斗破年番】官方改编用心了,彩鳞怀孕并未删,萧潇肯定登场,真相在丹药身上
【侵权联系删除】 【文/郑尔巴金】 斗破苍穹年番动画已经更新了,相信不少人都感觉到不可思议,萧炎跟随美杜莎女王回蛇人族的剧情,居然魔改成这样。好好的腹中孕育出新生命,变成了陨落心炎残余能量,不及时处理的话&…...
英语——分享篇——每日200词——3201-3400
3201——air-conditioning——[eərkəndɪʃnɪŋ]——n.空调设备;vt.给…装上空调——air-conditioning——air-condition空调(熟词)ing鹰(谐音)——空调设备的噪音让鹰不得安宁——The trains dont even have proper air-conditioning, grumbles Mr So. ——地铁…...
合并区间(C++解法)
题目 以数组 intervals 表示若干个区间的集合,其中单个区间为 intervals[i] [starti, endi] 。请你合并所有重叠的区间,并返回 一个不重叠的区间数组,该数组需恰好覆盖输入中的所有区间 。 示例 1: 输入:intervals …...
CUDA学习笔记(十四) Constant Memory
转载至https://www.cnblogs.com/1024incn/tag/CUDA/ CONSTANT MEMORY constant Memory对于device来说只读但是对于host是可读可写。constant Memory和global Memory一样都位于DRAM,并且有一个独立的on-chip cache,比直接从constant Memory读取要快得多…...
使用MFC创建一个SaleSystem
目录 1、项目的创建: 2、项目的配置: 3、设置窗口属性: (1)、设置图标 1)、添加导入资源 2)、代码初始化图标 (2)、设置标题 (3)、设置窗口…...
grafana v10.1版本设置告警
1. 相关概念概述 如图所示,点击切换菜单标志,可以看到警报相关子选项。 警报规则:通过PromQL语句定义告警规则,即达到怎样的状态触发告警。 联络点: 设置当警报规则实例触发时,如何通知联系人,…...
Python+Requests+PyTest+Excel+Allure 接口自动化测试实战
本文主要介绍了PythonRequestsPyTestExcelAllure 接口自动化测试实战,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧 Unittest是Python标准库中自带的单元测试框架…...
Appium+python自动化(十六)- ADB命令
简介 Android 调试桥(adb)是多种用途的工具,该工具可以帮助你你管理设备或模拟器 的状态。 adb ( Android Debug Bridge)是一个通用命令行工具,其允许您与模拟器实例或连接的 Android 设备进行通信。它可为各种设备操作提供便利,如安装和调试…...
Maven 概述、安装、配置、仓库、私服详解
目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...
用机器学习破解新能源领域的“弃风”难题
音乐发烧友深有体会,玩音乐的本质就是玩电网。火电声音偏暖,水电偏冷,风电偏空旷。至于太阳能发的电,则略显朦胧和单薄。 不知你是否有感觉,近两年家里的音响声音越来越冷,听起来越来越单薄? —…...
scikit-learn机器学习
# 同时添加如下代码, 这样每次环境(kernel)启动的时候只要运行下方代码即可: # Also add the following code, # so that every time the environment (kernel) starts, # just run the following code: import sys sys.path.append(/home/aistudio/external-libraries)机…...
【JavaSE】多线程基础学习笔记
多线程基础 -线程相关概念 程序(Program) 是为完成特定任务、用某种语言编写的一组指令的集合简单的说:就是我们写的代码 进程 进程是指运行中的程序,比如我们使用QQ,就启动了一个进程,操作系统就会为该进程分配内存…...
tomcat指定使用的jdk版本
说明 有时候需要对tomcat配置指定的jdk版本号,此时,我们可以通过以下方式进行配置 设置方式 找到tomcat的bin目录中的setclasspath.bat。如果是linux系统则是setclasspath.sh set JAVA_HOMEC:\Program Files\Java\jdk8 set JRE_HOMEC:\Program Files…...
DeepSeek越强,Kimi越慌?
被DeepSeek吊打的Kimi,还有多少人在用? 去年,月之暗面创始人杨植麟别提有多风光了。90后清华学霸,国产大模型六小虎之一,手握十几亿美金的融资。旗下的AI助手Kimi烧钱如流水,单月光是投流就花费2个亿。 疯…...
【java面试】微服务篇
【java面试】微服务篇 一、总体框架二、Springcloud(一)Springcloud五大组件(二)服务注册和发现1、Eureka2、Nacos (三)负载均衡1、Ribbon负载均衡流程2、Ribbon负载均衡策略3、自定义负载均衡策略4、总结 …...
Appium下载安装配置保姆教程(图文详解)
目录 一、Appium软件介绍 1.特点 2.工作原理 3.应用场景 二、环境准备 安装 Node.js 安装 Appium 安装 JDK 安装 Android SDK 安装Python及依赖包 三、安装教程 1.Node.js安装 1.1.下载Node 1.2.安装程序 1.3.配置npm仓储和缓存 1.4. 配置环境 1.5.测试Node.j…...
简约商务通用宣传年终总结12套PPT模版分享
IOS风格企业宣传PPT模版,年终工作总结PPT模版,简约精致扁平化商务通用动画PPT模版,素雅商务PPT模版 简约商务通用宣传年终总结12套PPT模版分享:商务通用年终总结类PPT模版https://pan.quark.cn/s/ece1e252d7df...
