当前位置: 首页 > news >正文

flask整合rabbitMQ插件的方式

文章目录

  • 二、Python-flask-rabbitMQ-插件方式整合
    • 引言
    • 具体步骤
      • 1 安装依赖:
      • 2 编写实体类:
      • 3 编写消费者和生产者:
      • 4 初始化消费者和生产者:
      • 5 其他地方使用生产者

二、Python-flask-rabbitMQ-插件方式整合

引言

当今互联网应用的高并发场景下,如何保证可靠的消息传递和高效的处理成为了一项重要的挑战。在这种情况下,RabbitMQ作为一种可靠的消息队列系统,被广泛应用于各个领域。

本文将介绍如何使用RabbitMQ插件整合Flask框架,实现并发性生产者和消费者的兼容。通过这种方式,我们可以利用RabbitMQ的优势来确保消息的可靠传递,并提高应用程序的处理能力。

首先,我们需要在Flask应用程序中引入RabbitMQ插件。可以使用Pika库来连接和操作RabbitMQ。通过创建一个连接池管理连接对象,我们可以避免频繁地创建和销毁连接,提高效率。

同时,为了处理高并发的生产者,我们可以使用批量发送消息的方式来减少通道创建和消息发布的开销。通过设置缓冲区来收集一定数量或一定时间间隔内的消息,然后批量发送,可以更有效地利用资源。

此外,对于高并发的消费者,我们可以考虑使用异步的方式来处理消息。通过将消息发送任务交给后台线程或异步任务队列处理,可以避免请求的堵塞,提高应用程序的并发能力。

通过以上的优化方案,我们可以在Flask应用程序中充分利用RabbitMQ的功能,并且兼容高并发的生产者和消费者。这将帮助我们构建更可靠、高效的消息队列系统,应对日益增长的并发访问需求。

总之,使用RabbitMQ插件整合Flask框架,并采用优化方案来兼容并发性生产者和消费者,是构建可靠、高效消息传递系统的关键一步。通过这种方式,我们能够更好地应对高并发场景下的挑战,提升应用程序的性能和稳定性。

具体步骤

1 安装依赖:

使用pip安装pika库:

pip install pika

2 编写实体类:

from queue import Queue
from threading import Lockimport pika# 定义交换机类型的枚举值
class ExchangeType:DEFAULT = 'default'DIRECT = "direct"FANOUT = "fanout"TOPIC = 'topic'class RabbitMQ:def __init__(self, host='localhost', port=5672, username='guest', password='guest', pool_size=10):self.credentials = pika.PlainCredentials(username, password)self.parameters = pika.ConnectionParameters(host=host, port=port, credentials=self.credentials)self.connection_pool = Queue(pool_size)  # 连接池,存储连接和信道self.lock = Lock()  # 互斥锁,用于对连接池的访问进行同步for _ in range(pool_size):connection = self._create_connection()channel = connection.channel()self.connection_pool.put((connection, channel))def _create_connection(self):return pika.BlockingConnection(self.parameters)def get_channel(self):with self.lock:connection, channel = self.connection_pool.get()  # 从连接池获取连接和信道return connection, channeldef release_channel(self, connection, channel):with self.lock:self.connection_pool.put((connection, channel))  # 将连接和信道放回连接池def send_message(self, exchange, routing_key, message, exchange_type=ExchangeType.DEFAULT):connection, channel = self.get_channel()try:channel.exchange_declare(exchange=exchange, exchange_type=exchange_type)  # 声明交换机并指定类型channel.basic_publish(exchange=exchange, routing_key=routing_key, body=message)  # 发布消息finally:self.release_channel(connection, channel)def receive_messages(self, queue, callback):connection, channel = self.get_channel()try:channel.queue_declare(queue=queue, durable=True)  # 声明队列并标记为持久化# channel.queue_purge(queue=queue)  # 清空队列,以防之前的非持久化消息残留channel.basic_qos(prefetch_count=10)  # 每次从 RabbitMQ 获取 10 条消息channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=False)  # 消费消息并设置回调函数channel.start_consuming()  # 开始消费消息finally:self.release_channel(connection, channel)

3 编写消费者和生产者:

def connect_mq(app):# 初始化 RabbitMQ 实例rabbitmq = RabbitMQ(host=app.config['RABBITMQ_HOST'], port=5672, username='guest', password='guest')# 在应用上下文中注册 RabbitMQ 实例app.config['RABBITMQ'] = rabbitmq# consume_mq(app)#thread = threading.Thread(target=consume_mq, args=(app,))# 启动线程thread.start()def consume_mq(app):# 启动消费者程序,开始接收和处理消息def callback(ch, method, properties, body):try:print(f"消息队列内容 {body.decode()}")# 处理rabbitMQ内容to_transcribe(body.decode())except Exception as e:print(str(e))ch.basic_ack(delivery_tag=method.delivery_tag)# 启动消费者程序,开始接收和处理消息with app.app_context():rabbitmq = current_app.config['RABBITMQ']rabbitmq.receive_messages('audio_queue', callback)

4 初始化消费者和生产者:

def create_app():app = Flask(__name__)connect_mq_v1(app)

5 其他地方使用生产者


class MessageHandler:"""处理存放音频,将所有的任务都放在MQ里面"""def __init__(self, dir_name,uuid_str, back_url, file_url, request_type, file_name, *args, **kwargs):# 文件夹名称self.dir_name = dir_name# 文件名称self.file_name = file_name# 文件上传类型self.request_type = request_type# 文件存储位置self.file_url = file_url# 客户端回调地址self.back_url = back_url# 唯一标识self.uuid_str = uuid_strdef send(self):""":param content_type:队列类型:param rpc:MQ对象:return:"""try:# 发送消息队列# rpc.send_expire(body=json.dumps(self.to_json()), exchange='audio_queue', key='audio_queue')rabbitmq = current_app.config['RABBITMQ']rabbitmq.send_message('audio_queue', 'audio_queue', json.dumps(self.to_json()))print("发送消息到mq成功,用于存放音频信息")except Exception as e:print(f"发送消息到mq服务失败,请检查, {e}")def to_json(self):_dict = self.__dict__return _dict
# 将请求体和uuid放到rabbitMQ中
MessageHandler(**dates).send()

相关文章:

flask整合rabbitMQ插件的方式

文章目录 二、Python-flask-rabbitMQ-插件方式整合引言具体步骤1 安装依赖:2 编写实体类:3 编写消费者和生产者:4 初始化消费者和生产者:5 其他地方使用生产者 二、Python-flask-rabbitMQ-插件方式整合 引言 当今互联网应用的高…...

【React】高频面试题

1. 简述下 React 的事件代理机制? React使用了一种称为“事件代理”(Event Delegation)的机制来处理事件。事件代理是指将事件处理程序绑定到组件的父级元素上,然后在需要处理事件的子元素上触发事件时,事件将被委托给…...

Java数据结构之稀疏数组

目录 线性结构与非线性结构线性结构非线性结构 稀疏数组应用场景 代码实现二维数组转稀疏数组稀疏数组转二维数组 线性结构与非线性结构 线性结构 数据结构分两种,线性与非线性,线性结构的数据元素之间存在一对一的关系。 一对一指的是每个数据元素都…...

迅为RK3568开发板RTMP推流之视频监控

1 搭建 RTMP 媒流体服务器 nginx-rtmp 是一个基于 nginx 的 RTMP 服务模块,是一个功能强大的流媒体服务器模块, 它提供了丰富的功能和灵活的配置选项,适用于构建各种规模的流媒体平台和应用。无论是搭建实时视频直播平台、点播系统或多屏互…...

利用CSRF或XSS攻击网站的例子

利用 CSRF 攻击网站的简单示例&#xff1a; 假设有一个在线银行应用&#xff0c;用户可以在其中执行转账操作。用户登录后&#xff0c;系统会生成一个包含转账信息的表单&#xff0c;用户需要填写表单来发起转账。这个表单如下所示&#xff1a; <form action"https:/…...

LeetCode讲解篇之113. 路径总和 II

文章目录 题目描述题解思路题解代码 题目描述 题解思路 深度优先遍历二叉树&#xff0c;遍历的同时记录路径&#xff0c;直到遍历到叶节点&#xff0c;若路径和为targetSum则添加到结果集中 题解代码 func pathSum(root *TreeNode, targetSum int) [][]int {var res make([…...

中国HR从业者现状是怎样的?应如何提升自己?

HR(Human Resource)解释为人力资源&#xff0c;现在统称为人力资源顾问&#xff0c;跟传统人事有本质区别。传统人事一般是和行政部做相类似的工作&#xff0c;比如招聘&#xff0c;培训&#xff0c;职员的考核&#xff0c;职员的薪酬&#xff0c;职员调动等。现代人力资源&…...

Promise笔记-同步回调-异步回调-JS中的异常error处理-Promis的理解和使用-基本使用-链式调用-七个关键问题

Promise笔记 1. 预备知识1.1 实例对象与函数对象1.2 两种类型的回调函数1. 同步回调2. 异步回调 1.3 JS中的异常error处理1. 错误的类型2. 错误处理&#xff08;捕获与抛出&#xff09;3. 错误对象 2.Promise的理解和使用2.1 Promise是什么1.理解Promise2.Promise 的状态3. Pro…...

计算机考研自命题(2)

1、C语言-字符串交替拼接 1、用C编程&#xff0c;将两个字符串数组存储实现交替连接如aaa和bbb两个字符连接成ababab 如aaa和baba 两个字符&#xff0c;连接成 abaaaba #include<stdio.h>/* 解题思路&#xff1a;将两个字符串交替拼接&#xff0c;定义三个数组&#xff0…...

ZKP6.1 Discrete-log-based Polynomial Commitments (Preliminary)

ZKP学习笔记 ZK-Learning MOOC课程笔记 Lecture 6: Discrete-log-based Polynomial Commitments (Yupeng Zhang) Recall How to build an efficient SNARK? A polynomial commitment scheme A polynomial interactive oracle proof (IOP) SNARK for general circuits Plo…...

五金经营小程序商城的作用体现在哪

对消费者而言&#xff0c;如今线上购买五金是很多人的选择&#xff0c;传统线下购买&#xff0c;不仅需要跑路&#xff0c;而且店内未必有所需品&#xff0c;但线上平台则一目了然购买所需品&#xff0c;本地/外地均可以触达到&#xff0c;同时还可对用户/会员进行高效管理&…...

今年这行情,不会自动化的要做好心理准备了

李强是一名软件测试工程师&#xff0c;入行之后在一家小型公司工作了五年。这段时间里&#xff0c;他主要负责手工测试和一些简单的自动化测试工作。由于公司项目也相对简单&#xff0c;他逐渐陷入了工作的舒适区&#xff0c;没有积极追求新的知识和技能。 然而随着身边朋友发展…...

汽车保养笔记

汽车保养笔记 汽车小保养汽车大保养五油&#xff1a;机油变速箱油刹车油转向助力油离合器油 四滤&#xff1a;机油滤芯更换空气滤芯更换空调滤芯更换汽油滤芯更换 三水防冻液(水)玻璃水电瓶水 其他刹车片球头减震器火花塞 4S店的4大套路---没必要清洗节气门更换火花塞和高压线圈…...

【斗破年番】官方改编用心了,彩鳞怀孕并未删,萧潇肯定登场,真相在丹药身上

【侵权联系删除】 【文/郑尔巴金】 斗破苍穹年番动画已经更新了&#xff0c;相信不少人都感觉到不可思议&#xff0c;萧炎跟随美杜莎女王回蛇人族的剧情&#xff0c;居然魔改成这样。好好的腹中孕育出新生命&#xff0c;变成了陨落心炎残余能量&#xff0c;不及时处理的话&…...

英语——分享篇——每日200词——3201-3400

3201——air-conditioning——[eərkəndɪʃnɪŋ]——n.空调设备&#xff1b;vt.给…装上空调——air-conditioning——air-condition空调(熟词)ing鹰(谐音)——空调设备的噪音让鹰不得安宁——The trains dont even have proper air-conditioning, grumbles Mr So. ——地铁…...

合并区间(C++解法)

题目 以数组 intervals 表示若干个区间的集合&#xff0c;其中单个区间为 intervals[i] [starti, endi] 。请你合并所有重叠的区间&#xff0c;并返回 一个不重叠的区间数组&#xff0c;该数组需恰好覆盖输入中的所有区间 。 示例 1&#xff1a; 输入&#xff1a;intervals …...

CUDA学习笔记(十四) Constant Memory

转载至https://www.cnblogs.com/1024incn/tag/CUDA/ CONSTANT MEMORY constant Memory对于device来说只读但是对于host是可读可写。constant Memory和global Memory一样都位于DRAM&#xff0c;并且有一个独立的on-chip cache&#xff0c;比直接从constant Memory读取要快得多…...

使用MFC创建一个SaleSystem

目录 1、项目的创建&#xff1a; 2、项目的配置&#xff1a; 3、设置窗口属性&#xff1a; &#xff08;1&#xff09;、设置图标 1&#xff09;、添加导入资源 2&#xff09;、代码初始化图标 &#xff08;2&#xff09;、设置标题 &#xff08;3&#xff09;、设置窗口…...

grafana v10.1版本设置告警

1. 相关概念概述 如图所示&#xff0c;点击切换菜单标志&#xff0c;可以看到警报相关子选项。 警报规则&#xff1a;通过PromQL语句定义告警规则&#xff0c;即达到怎样的状态触发告警。 联络点&#xff1a; 设置当警报规则实例触发时&#xff0c;如何通知联系人&#xff0c;…...

Python+Requests+PyTest+Excel+Allure 接口自动化测试实战

本文主要介绍了PythonRequestsPyTestExcelAllure 接口自动化测试实战&#xff0c;文中通过示例代码介绍的非常详细&#xff0c;对大家的学习或者工作具有一定的参考学习价值&#xff0c;需要的朋友们下面随着小编来一起学习学习吧 Unittest是Python标准库中自带的单元测试框架…...

【网络】每天掌握一个Linux命令 - iftop

在Linux系统中&#xff0c;iftop是网络管理的得力助手&#xff0c;能实时监控网络流量、连接情况等&#xff0c;帮助排查网络异常。接下来从多方面详细介绍它。 目录 【网络】每天掌握一个Linux命令 - iftop工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景…...

树莓派超全系列教程文档--(61)树莓派摄像头高级使用方法

树莓派摄像头高级使用方法 配置通过调谐文件来调整相机行为 使用多个摄像头安装 libcam 和 rpicam-apps依赖关系开发包 文章来源&#xff1a; http://raspberry.dns8844.cn/documentation 原文网址 配置 大多数用例自动工作&#xff0c;无需更改相机配置。但是&#xff0c;一…...

Spring Boot 实现流式响应(兼容 2.7.x)

在实际开发中&#xff0c;我们可能会遇到一些流式数据处理的场景&#xff0c;比如接收来自上游接口的 Server-Sent Events&#xff08;SSE&#xff09; 或 流式 JSON 内容&#xff0c;并将其原样中转给前端页面或客户端。这种情况下&#xff0c;传统的 RestTemplate 缓存机制会…...

MongoDB学习和应用(高效的非关系型数据库)

一丶 MongoDB简介 对于社交类软件的功能&#xff0c;我们需要对它的功能特点进行分析&#xff1a; 数据量会随着用户数增大而增大读多写少价值较低非好友看不到其动态信息地理位置的查询… 针对以上特点进行分析各大存储工具&#xff1a; mysql&#xff1a;关系型数据库&am…...

线程同步:确保多线程程序的安全与高效!

全文目录&#xff1a; 开篇语前序前言第一部分&#xff1a;线程同步的概念与问题1.1 线程同步的概念1.2 线程同步的问题1.3 线程同步的解决方案 第二部分&#xff1a;synchronized关键字的使用2.1 使用 synchronized修饰方法2.2 使用 synchronized修饰代码块 第三部分&#xff…...

最新SpringBoot+SpringCloud+Nacos微服务框架分享

文章目录 前言一、服务规划二、架构核心1.cloud的pom2.gateway的异常handler3.gateway的filter4、admin的pom5、admin的登录核心 三、code-helper分享总结 前言 最近有个活蛮赶的&#xff0c;根据Excel列的需求预估的工时直接打骨折&#xff0c;不要问我为什么&#xff0c;主要…...

鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个生活电费的缴纳和查询小程序

一、项目初始化与配置 1. 创建项目 ohpm init harmony/utility-payment-app 2. 配置权限 // module.json5 {"requestPermissions": [{"name": "ohos.permission.INTERNET"},{"name": "ohos.permission.GET_NETWORK_INFO"…...

AI编程--插件对比分析:CodeRider、GitHub Copilot及其他

AI编程插件对比分析&#xff1a;CodeRider、GitHub Copilot及其他 随着人工智能技术的快速发展&#xff0c;AI编程插件已成为提升开发者生产力的重要工具。CodeRider和GitHub Copilot作为市场上的领先者&#xff0c;分别以其独特的特性和生态系统吸引了大量开发者。本文将从功…...

企业如何增强终端安全?

在数字化转型加速的今天&#xff0c;企业的业务运行越来越依赖于终端设备。从员工的笔记本电脑、智能手机&#xff0c;到工厂里的物联网设备、智能传感器&#xff0c;这些终端构成了企业与外部世界连接的 “神经末梢”。然而&#xff0c;随着远程办公的常态化和设备接入的爆炸式…...

【Java学习笔记】BigInteger 和 BigDecimal 类

BigInteger 和 BigDecimal 类 二者共有的常见方法 方法功能add加subtract减multiply乘divide除 注意点&#xff1a;传参类型必须是类对象 一、BigInteger 1. 作用&#xff1a;适合保存比较大的整型数 2. 使用说明 创建BigInteger对象 传入字符串 3. 代码示例 import j…...