当前位置: 首页 > news >正文

flask整合rabbitMQ插件的方式

文章目录

  • 二、Python-flask-rabbitMQ-插件方式整合
    • 引言
    • 具体步骤
      • 1 安装依赖:
      • 2 编写实体类:
      • 3 编写消费者和生产者:
      • 4 初始化消费者和生产者:
      • 5 其他地方使用生产者

二、Python-flask-rabbitMQ-插件方式整合

引言

当今互联网应用的高并发场景下,如何保证可靠的消息传递和高效的处理成为了一项重要的挑战。在这种情况下,RabbitMQ作为一种可靠的消息队列系统,被广泛应用于各个领域。

本文将介绍如何使用RabbitMQ插件整合Flask框架,实现并发性生产者和消费者的兼容。通过这种方式,我们可以利用RabbitMQ的优势来确保消息的可靠传递,并提高应用程序的处理能力。

首先,我们需要在Flask应用程序中引入RabbitMQ插件。可以使用Pika库来连接和操作RabbitMQ。通过创建一个连接池管理连接对象,我们可以避免频繁地创建和销毁连接,提高效率。

同时,为了处理高并发的生产者,我们可以使用批量发送消息的方式来减少通道创建和消息发布的开销。通过设置缓冲区来收集一定数量或一定时间间隔内的消息,然后批量发送,可以更有效地利用资源。

此外,对于高并发的消费者,我们可以考虑使用异步的方式来处理消息。通过将消息发送任务交给后台线程或异步任务队列处理,可以避免请求的堵塞,提高应用程序的并发能力。

通过以上的优化方案,我们可以在Flask应用程序中充分利用RabbitMQ的功能,并且兼容高并发的生产者和消费者。这将帮助我们构建更可靠、高效的消息队列系统,应对日益增长的并发访问需求。

总之,使用RabbitMQ插件整合Flask框架,并采用优化方案来兼容并发性生产者和消费者,是构建可靠、高效消息传递系统的关键一步。通过这种方式,我们能够更好地应对高并发场景下的挑战,提升应用程序的性能和稳定性。

具体步骤

1 安装依赖:

使用pip安装pika库:

pip install pika

2 编写实体类:

from queue import Queue
from threading import Lockimport pika# 定义交换机类型的枚举值
class ExchangeType:DEFAULT = 'default'DIRECT = "direct"FANOUT = "fanout"TOPIC = 'topic'class RabbitMQ:def __init__(self, host='localhost', port=5672, username='guest', password='guest', pool_size=10):self.credentials = pika.PlainCredentials(username, password)self.parameters = pika.ConnectionParameters(host=host, port=port, credentials=self.credentials)self.connection_pool = Queue(pool_size)  # 连接池,存储连接和信道self.lock = Lock()  # 互斥锁,用于对连接池的访问进行同步for _ in range(pool_size):connection = self._create_connection()channel = connection.channel()self.connection_pool.put((connection, channel))def _create_connection(self):return pika.BlockingConnection(self.parameters)def get_channel(self):with self.lock:connection, channel = self.connection_pool.get()  # 从连接池获取连接和信道return connection, channeldef release_channel(self, connection, channel):with self.lock:self.connection_pool.put((connection, channel))  # 将连接和信道放回连接池def send_message(self, exchange, routing_key, message, exchange_type=ExchangeType.DEFAULT):connection, channel = self.get_channel()try:channel.exchange_declare(exchange=exchange, exchange_type=exchange_type)  # 声明交换机并指定类型channel.basic_publish(exchange=exchange, routing_key=routing_key, body=message)  # 发布消息finally:self.release_channel(connection, channel)def receive_messages(self, queue, callback):connection, channel = self.get_channel()try:channel.queue_declare(queue=queue, durable=True)  # 声明队列并标记为持久化# channel.queue_purge(queue=queue)  # 清空队列,以防之前的非持久化消息残留channel.basic_qos(prefetch_count=10)  # 每次从 RabbitMQ 获取 10 条消息channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=False)  # 消费消息并设置回调函数channel.start_consuming()  # 开始消费消息finally:self.release_channel(connection, channel)

3 编写消费者和生产者:

def connect_mq(app):# 初始化 RabbitMQ 实例rabbitmq = RabbitMQ(host=app.config['RABBITMQ_HOST'], port=5672, username='guest', password='guest')# 在应用上下文中注册 RabbitMQ 实例app.config['RABBITMQ'] = rabbitmq# consume_mq(app)#thread = threading.Thread(target=consume_mq, args=(app,))# 启动线程thread.start()def consume_mq(app):# 启动消费者程序,开始接收和处理消息def callback(ch, method, properties, body):try:print(f"消息队列内容 {body.decode()}")# 处理rabbitMQ内容to_transcribe(body.decode())except Exception as e:print(str(e))ch.basic_ack(delivery_tag=method.delivery_tag)# 启动消费者程序,开始接收和处理消息with app.app_context():rabbitmq = current_app.config['RABBITMQ']rabbitmq.receive_messages('audio_queue', callback)

4 初始化消费者和生产者:

def create_app():app = Flask(__name__)connect_mq_v1(app)

5 其他地方使用生产者


class MessageHandler:"""处理存放音频,将所有的任务都放在MQ里面"""def __init__(self, dir_name,uuid_str, back_url, file_url, request_type, file_name, *args, **kwargs):# 文件夹名称self.dir_name = dir_name# 文件名称self.file_name = file_name# 文件上传类型self.request_type = request_type# 文件存储位置self.file_url = file_url# 客户端回调地址self.back_url = back_url# 唯一标识self.uuid_str = uuid_strdef send(self):""":param content_type:队列类型:param rpc:MQ对象:return:"""try:# 发送消息队列# rpc.send_expire(body=json.dumps(self.to_json()), exchange='audio_queue', key='audio_queue')rabbitmq = current_app.config['RABBITMQ']rabbitmq.send_message('audio_queue', 'audio_queue', json.dumps(self.to_json()))print("发送消息到mq成功,用于存放音频信息")except Exception as e:print(f"发送消息到mq服务失败,请检查, {e}")def to_json(self):_dict = self.__dict__return _dict
# 将请求体和uuid放到rabbitMQ中
MessageHandler(**dates).send()

相关文章:

flask整合rabbitMQ插件的方式

文章目录 二、Python-flask-rabbitMQ-插件方式整合引言具体步骤1 安装依赖:2 编写实体类:3 编写消费者和生产者:4 初始化消费者和生产者:5 其他地方使用生产者 二、Python-flask-rabbitMQ-插件方式整合 引言 当今互联网应用的高…...

【React】高频面试题

1. 简述下 React 的事件代理机制? React使用了一种称为“事件代理”(Event Delegation)的机制来处理事件。事件代理是指将事件处理程序绑定到组件的父级元素上,然后在需要处理事件的子元素上触发事件时,事件将被委托给…...

Java数据结构之稀疏数组

目录 线性结构与非线性结构线性结构非线性结构 稀疏数组应用场景 代码实现二维数组转稀疏数组稀疏数组转二维数组 线性结构与非线性结构 线性结构 数据结构分两种,线性与非线性,线性结构的数据元素之间存在一对一的关系。 一对一指的是每个数据元素都…...

迅为RK3568开发板RTMP推流之视频监控

1 搭建 RTMP 媒流体服务器 nginx-rtmp 是一个基于 nginx 的 RTMP 服务模块,是一个功能强大的流媒体服务器模块, 它提供了丰富的功能和灵活的配置选项,适用于构建各种规模的流媒体平台和应用。无论是搭建实时视频直播平台、点播系统或多屏互…...

利用CSRF或XSS攻击网站的例子

利用 CSRF 攻击网站的简单示例&#xff1a; 假设有一个在线银行应用&#xff0c;用户可以在其中执行转账操作。用户登录后&#xff0c;系统会生成一个包含转账信息的表单&#xff0c;用户需要填写表单来发起转账。这个表单如下所示&#xff1a; <form action"https:/…...

LeetCode讲解篇之113. 路径总和 II

文章目录 题目描述题解思路题解代码 题目描述 题解思路 深度优先遍历二叉树&#xff0c;遍历的同时记录路径&#xff0c;直到遍历到叶节点&#xff0c;若路径和为targetSum则添加到结果集中 题解代码 func pathSum(root *TreeNode, targetSum int) [][]int {var res make([…...

中国HR从业者现状是怎样的?应如何提升自己?

HR(Human Resource)解释为人力资源&#xff0c;现在统称为人力资源顾问&#xff0c;跟传统人事有本质区别。传统人事一般是和行政部做相类似的工作&#xff0c;比如招聘&#xff0c;培训&#xff0c;职员的考核&#xff0c;职员的薪酬&#xff0c;职员调动等。现代人力资源&…...

Promise笔记-同步回调-异步回调-JS中的异常error处理-Promis的理解和使用-基本使用-链式调用-七个关键问题

Promise笔记 1. 预备知识1.1 实例对象与函数对象1.2 两种类型的回调函数1. 同步回调2. 异步回调 1.3 JS中的异常error处理1. 错误的类型2. 错误处理&#xff08;捕获与抛出&#xff09;3. 错误对象 2.Promise的理解和使用2.1 Promise是什么1.理解Promise2.Promise 的状态3. Pro…...

计算机考研自命题(2)

1、C语言-字符串交替拼接 1、用C编程&#xff0c;将两个字符串数组存储实现交替连接如aaa和bbb两个字符连接成ababab 如aaa和baba 两个字符&#xff0c;连接成 abaaaba #include<stdio.h>/* 解题思路&#xff1a;将两个字符串交替拼接&#xff0c;定义三个数组&#xff0…...

ZKP6.1 Discrete-log-based Polynomial Commitments (Preliminary)

ZKP学习笔记 ZK-Learning MOOC课程笔记 Lecture 6: Discrete-log-based Polynomial Commitments (Yupeng Zhang) Recall How to build an efficient SNARK? A polynomial commitment scheme A polynomial interactive oracle proof (IOP) SNARK for general circuits Plo…...

五金经营小程序商城的作用体现在哪

对消费者而言&#xff0c;如今线上购买五金是很多人的选择&#xff0c;传统线下购买&#xff0c;不仅需要跑路&#xff0c;而且店内未必有所需品&#xff0c;但线上平台则一目了然购买所需品&#xff0c;本地/外地均可以触达到&#xff0c;同时还可对用户/会员进行高效管理&…...

今年这行情,不会自动化的要做好心理准备了

李强是一名软件测试工程师&#xff0c;入行之后在一家小型公司工作了五年。这段时间里&#xff0c;他主要负责手工测试和一些简单的自动化测试工作。由于公司项目也相对简单&#xff0c;他逐渐陷入了工作的舒适区&#xff0c;没有积极追求新的知识和技能。 然而随着身边朋友发展…...

汽车保养笔记

汽车保养笔记 汽车小保养汽车大保养五油&#xff1a;机油变速箱油刹车油转向助力油离合器油 四滤&#xff1a;机油滤芯更换空气滤芯更换空调滤芯更换汽油滤芯更换 三水防冻液(水)玻璃水电瓶水 其他刹车片球头减震器火花塞 4S店的4大套路---没必要清洗节气门更换火花塞和高压线圈…...

【斗破年番】官方改编用心了,彩鳞怀孕并未删,萧潇肯定登场,真相在丹药身上

【侵权联系删除】 【文/郑尔巴金】 斗破苍穹年番动画已经更新了&#xff0c;相信不少人都感觉到不可思议&#xff0c;萧炎跟随美杜莎女王回蛇人族的剧情&#xff0c;居然魔改成这样。好好的腹中孕育出新生命&#xff0c;变成了陨落心炎残余能量&#xff0c;不及时处理的话&…...

英语——分享篇——每日200词——3201-3400

3201——air-conditioning——[eərkəndɪʃnɪŋ]——n.空调设备&#xff1b;vt.给…装上空调——air-conditioning——air-condition空调(熟词)ing鹰(谐音)——空调设备的噪音让鹰不得安宁——The trains dont even have proper air-conditioning, grumbles Mr So. ——地铁…...

合并区间(C++解法)

题目 以数组 intervals 表示若干个区间的集合&#xff0c;其中单个区间为 intervals[i] [starti, endi] 。请你合并所有重叠的区间&#xff0c;并返回 一个不重叠的区间数组&#xff0c;该数组需恰好覆盖输入中的所有区间 。 示例 1&#xff1a; 输入&#xff1a;intervals …...

CUDA学习笔记(十四) Constant Memory

转载至https://www.cnblogs.com/1024incn/tag/CUDA/ CONSTANT MEMORY constant Memory对于device来说只读但是对于host是可读可写。constant Memory和global Memory一样都位于DRAM&#xff0c;并且有一个独立的on-chip cache&#xff0c;比直接从constant Memory读取要快得多…...

使用MFC创建一个SaleSystem

目录 1、项目的创建&#xff1a; 2、项目的配置&#xff1a; 3、设置窗口属性&#xff1a; &#xff08;1&#xff09;、设置图标 1&#xff09;、添加导入资源 2&#xff09;、代码初始化图标 &#xff08;2&#xff09;、设置标题 &#xff08;3&#xff09;、设置窗口…...

grafana v10.1版本设置告警

1. 相关概念概述 如图所示&#xff0c;点击切换菜单标志&#xff0c;可以看到警报相关子选项。 警报规则&#xff1a;通过PromQL语句定义告警规则&#xff0c;即达到怎样的状态触发告警。 联络点&#xff1a; 设置当警报规则实例触发时&#xff0c;如何通知联系人&#xff0c;…...

Python+Requests+PyTest+Excel+Allure 接口自动化测试实战

本文主要介绍了PythonRequestsPyTestExcelAllure 接口自动化测试实战&#xff0c;文中通过示例代码介绍的非常详细&#xff0c;对大家的学习或者工作具有一定的参考学习价值&#xff0c;需要的朋友们下面随着小编来一起学习学习吧 Unittest是Python标准库中自带的单元测试框架…...

网络编程(Modbus进阶)

思维导图 Modbus RTU&#xff08;先学一点理论&#xff09; 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议&#xff0c;由 Modicon 公司&#xff08;现施耐德电气&#xff09;于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…...

【OSG学习笔记】Day 18: 碰撞检测与物理交互

物理引擎&#xff08;Physics Engine&#xff09; 物理引擎 是一种通过计算机模拟物理规律&#xff08;如力学、碰撞、重力、流体动力学等&#xff09;的软件工具或库。 它的核心目标是在虚拟环境中逼真地模拟物体的运动和交互&#xff0c;广泛应用于 游戏开发、动画制作、虚…...

day52 ResNet18 CBAM

在深度学习的旅程中&#xff0c;我们不断探索如何提升模型的性能。今天&#xff0c;我将分享我在 ResNet18 模型中插入 CBAM&#xff08;Convolutional Block Attention Module&#xff09;模块&#xff0c;并采用分阶段微调策略的实践过程。通过这个过程&#xff0c;我不仅提升…...

Admin.Net中的消息通信SignalR解释

定义集线器接口 IOnlineUserHub public interface IOnlineUserHub {/// 在线用户列表Task OnlineUserList(OnlineUserList context);/// 强制下线Task ForceOffline(object context);/// 发布站内消息Task PublicNotice(SysNotice context);/// 接收消息Task ReceiveMessage(…...

【SpringBoot】100、SpringBoot中使用自定义注解+AOP实现参数自动解密

在实际项目中,用户注册、登录、修改密码等操作,都涉及到参数传输安全问题。所以我们需要在前端对账户、密码等敏感信息加密传输,在后端接收到数据后能自动解密。 1、引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId...

WEB3全栈开发——面试专业技能点P2智能合约开发(Solidity)

一、Solidity合约开发 下面是 Solidity 合约开发 的概念、代码示例及讲解&#xff0c;适合用作学习或写简历项目背景说明。 &#x1f9e0; 一、概念简介&#xff1a;Solidity 合约开发 Solidity 是一种专门为 以太坊&#xff08;Ethereum&#xff09;平台编写智能合约的高级编…...

Rapidio门铃消息FIFO溢出机制

关于RapidIO门铃消息FIFO的溢出机制及其与中断抖动的关系&#xff0c;以下是深入解析&#xff1a; 门铃FIFO溢出的本质 在RapidIO系统中&#xff0c;门铃消息FIFO是硬件控制器内部的缓冲区&#xff0c;用于临时存储接收到的门铃消息&#xff08;Doorbell Message&#xff09;。…...

Pinocchio 库详解及其在足式机器人上的应用

Pinocchio 库详解及其在足式机器人上的应用 Pinocchio (Pinocchio is not only a nose) 是一个开源的 C 库&#xff0c;专门用于快速计算机器人模型的正向运动学、逆向运动学、雅可比矩阵、动力学和动力学导数。它主要关注效率和准确性&#xff0c;并提供了一个通用的框架&…...

服务器--宝塔命令

一、宝塔面板安装命令 ⚠️ 必须使用 root 用户 或 sudo 权限执行&#xff01; sudo su - 1. CentOS 系统&#xff1a; yum install -y wget && wget -O install.sh http://download.bt.cn/install/install_6.0.sh && sh install.sh2. Ubuntu / Debian 系统…...

视频行为标注工具BehaviLabel(源码+使用介绍+Windows.Exe版本)

前言&#xff1a; 最近在做行为检测相关的模型&#xff0c;用的是时空图卷积网络&#xff08;STGCN&#xff09;&#xff0c;但原有kinetic-400数据集数据质量较低&#xff0c;需要进行细粒度的标注&#xff0c;同时粗略搜了下已有开源工具基本都集中于图像分割这块&#xff0c…...